From 08ab983d58c41e24434b77a9b30c1bad68d4eda4 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Apr 2019 17:28:44 -0700 Subject: [PATCH 01/10] now with 100% more buffer --- .../bloom/BaseBloomFilterAggregator.java | 11 +++- .../BaseBloomFilterBufferAggregator.java | 58 ++++++++++++++++++- .../bloom/BloomFilterAggregateCombiner.java | 21 ++++--- .../bloom/BloomFilterAggregatorFactory.java | 23 ++++---- .../bloom/BloomFilterMergeAggregator.java | 21 ++----- .../BloomFilterMergeAggregatorFactory.java | 3 +- .../bloom/DoubleBloomFilterAggregator.java | 12 +--- .../DoubleBloomFilterBufferAggregator.java | 8 +-- .../bloom/FloatBloomFilterAggregator.java | 12 +--- .../bloom/LongBloomFilterAggregator.java | 12 +--- .../LongBloomFilterBufferAggregator.java | 8 +-- .../bloom/NoopBloomFilterAggregator.java | 2 +- .../bloom/StringBloomFilterAggregator.java | 24 +------- .../bloom/BloomFilterAggregatorTest.java | 23 +++++--- .../sql/BloomFilterSqlAggregatorTest.java | 1 - .../druid/sql/calcite/util/CalciteTests.java | 2 - 16 files changed, 124 insertions(+), 117 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 652236b7c68a..9f22d537c0a8 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -24,16 +24,21 @@ import org.apache.druid.segment.BaseNullableColumnValueSelector; import javax.annotation.Nullable; +import java.nio.ByteBuffer; public abstract class BaseBloomFilterAggregator implements Aggregator { - final BloomKFilter collector; + protected final int maxNumEntries; protected final TSelector selector; + final ByteBuffer collector; - BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector) + BaseBloomFilterAggregator(TSelector selector, int maxNumEntries) { - this.collector = collector; this.selector = selector; + this.maxNumEntries = maxNumEntries; + BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries); + this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries)); + BloomKFilter.serialize(collector, bloomFilter); } @Nullable diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java index ff866f9ffd65..2f0184c4f4bc 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java @@ -19,14 +19,20 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; import java.nio.ByteBuffer; -public abstract class BaseBloomFilterBufferAggregator implements BufferAggregator +public abstract class BaseBloomFilterBufferAggregator + implements BufferAggregator { protected final int maxNumEntries; protected final TSelector selector; @@ -64,7 +70,7 @@ public Object get(ByteBuffer buf, int position) ByteBuffer mutationBuffer = buf.duplicate(); mutationBuffer.position(position); // | k (byte) | numLongs (int) | bitset (long[numLongs]) | - int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES); + int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries); mutationBuffer.limit(position + sizeBytes); ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); @@ -102,4 +108,52 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("selector", selector); } + + public static void bufferAddFloat(ByteBuffer buffer, BaseFloatColumnValueSelector selector) + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addFloat(buffer, selector.getFloat()); + } else { + BloomKFilter.addBytes(buffer, null, 0, 0); + } + } + + public static void bufferAddLong(ByteBuffer buffer, BaseLongColumnValueSelector selector) + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addLong(buffer, selector.getLong()); + } else { + BloomKFilter.addBytes(buffer, null, 0, 0); + } + } + + public static void bufferAddDouble(ByteBuffer buffer, BaseDoubleColumnValueSelector selector) + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addDouble(buffer, selector.getDouble()); + } else { + BloomKFilter.addBytes(buffer, null, 0, 0); + } + } + + public static void bufferAddDim(ByteBuffer buffer, DimensionSelector selector) + { + if (selector.getRow().size() > 1) { + selector.getRow().forEach(v -> { + String value = selector.lookupName(v); + if (value == null) { + BloomKFilter.addBytes(buffer, null, 0, 0); + } else { + BloomKFilter.addString(buffer, value); + } + }); + } else { + String value = (String) selector.getObject(); + if (value == null) { + BloomKFilter.addBytes(buffer, null, 0, 0); + } else { + BloomKFilter.addString(buffer, value); + } + } + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java index 6fc4bf9379e4..e5daddea42d8 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java @@ -24,11 +24,12 @@ import org.apache.druid.segment.ColumnValueSelector; import javax.annotation.Nullable; +import java.nio.ByteBuffer; -public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner +public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner { @Nullable - private BloomKFilter combined; + private ByteBuffer combined; private final int maxNumEntries; @@ -47,26 +48,30 @@ public void reset(ColumnValueSelector selector) @Override public void fold(ColumnValueSelector selector) { - BloomKFilter other = (BloomKFilter) selector.getObject(); + ByteBuffer other = (ByteBuffer) selector.getObject(); if (other == null) { return; } if (combined == null) { - combined = new BloomKFilter(maxNumEntries); + BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries); + ByteBuffer buffer = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries)); + BloomKFilter.serialize(buffer, bloomFilter); } - combined.merge(other); + + BloomKFilter.mergeBloomFilterByteBuffers(combined, 0, other, 0); } + @Nullable @Override - public BloomKFilter getObject() + public ByteBuffer getObject() { return combined; } @Override - public Class classOfObject() + public Class classOfObject() { - return BloomKFilter.class; + return ByteBuffer.class; } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index 42d379ec6bc7..3f10b8a9ee3d 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -59,10 +59,6 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory BloomKFilter.getNumSetBits(buf1, buf1.position()), BloomKFilter.getNumSetBits(buf2, buf2.position()) ); - } else if (o1 instanceof BloomKFilter && o2 instanceof BloomKFilter) { - BloomKFilter o1f = (BloomKFilter) o1; - BloomKFilter o2f = (BloomKFilter) o2; - return Integer.compare(o1f.getNumSetBits(), o2f.getNumSetBits()); } else { throw new RE("Unable to compare unexpected types [%s]", o1.getClass().getName()); } @@ -104,13 +100,13 @@ public Aggregator factorize(ColumnSelectorFactory columnFactory) ValueType type = capabilities.getType(); switch (type) { case STRING: - return new StringBloomFilterAggregator(columnFactory.makeDimensionSelector(field), filter); + return new StringBloomFilterAggregator(columnFactory.makeDimensionSelector(field), maxNumEntries); case LONG: - return new LongBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + return new LongBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries); case FLOAT: - return new FloatBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + return new FloatBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries); case DOUBLE: - return new DoubleBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + return new DoubleBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries); default: throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type); } @@ -168,7 +164,12 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) if (lhs == null) { return rhs; } - ((BloomKFilter) lhs).merge((BloomKFilter) rhs); + BloomKFilter.mergeBloomFilterByteBuffers( + (ByteBuffer) lhs, + ((ByteBuffer) lhs).position(), + (ByteBuffer) rhs, + ((ByteBuffer) rhs).position() + ); return lhs; } @@ -195,7 +196,9 @@ public Object deserialize(Object object) { if (object instanceof String) { return ByteBuffer.wrap(StringUtils.decodeBase64String((String) object)); - } else { + } else if (object instanceof byte[]) { + return ByteBuffer.wrap((byte[]) object); + } else { return object; } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java index 67d7a70cb418..d2574ee4f966 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -22,14 +22,13 @@ import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.ColumnValueSelector; -import java.io.IOException; import java.nio.ByteBuffer; public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator> { - public BloomFilterMergeAggregator(ColumnValueSelector selector, BloomKFilter collector) + public BloomFilterMergeAggregator(ColumnValueSelector selector, int maxNumEntries) { - super(selector, collector); + super(selector, maxNumEntries); } @Override @@ -37,20 +36,8 @@ public void aggregate() { Object other = selector.getObject(); if (other != null) { - if (other instanceof BloomKFilter) { - collector.merge((BloomKFilter) other); - } else if (other instanceof ByteBuffer) { - // fun fact: because bloom filter agg factory deserialize returns a byte buffer to avoid unnecessary serde, - // but GroupByQueryEngine (group by v1) ends up trying to merge ByteBuffers from buffer aggs with this agg - // instead of the BloomFilterBufferMergeAggregator. fun! Also, it requires a 'ComplexMetricSerde' to be - // registered even for query time only aggs, but then never uses it. also fun! - try { - BloomKFilter otherFilter = BloomKFilter.deserialize((ByteBuffer) other); - collector.merge(otherFilter); - } - catch (IOException ioe) { - throw new RuntimeException("Failed to deserialize BloomKFilter", ioe); - } + if (other instanceof ByteBuffer) { + BloomKFilter.mergeBloomFilterByteBuffers(collector, 0, (ByteBuffer) other, 0); } } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java index 8dab8676e1ce..90287495f343 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java @@ -25,7 +25,6 @@ import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -53,7 +52,7 @@ public Aggregator factorize(final ColumnSelectorFactory metricFactory) if (selector instanceof NilColumnValueSelector) { throw new ISE("WTF?! Unexpected NilColumnValueSelector"); } - return new BloomFilterMergeAggregator((ColumnValueSelector) selector, new BloomKFilter(getMaxNumEntries())); + return new BloomFilterMergeAggregator((ColumnValueSelector) selector, getMaxNumEntries()); } @Override diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java index dfdae6c20d6e..84d72baf3cf6 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java @@ -19,24 +19,18 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseDoubleColumnValueSelector; public final class DoubleBloomFilterAggregator extends BaseBloomFilterAggregator { - DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, BloomKFilter collector) + DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries) { - super(selector, collector); + super(selector, maxNumEntries); } @Override public void aggregate() { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.addDouble(selector.getDouble()); - } else { - collector.addBytes(null, 0, 0); - } + BaseBloomFilterBufferAggregator.bufferAddDouble(collector, selector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java index e84b9fc70df4..0e5aeca36f17 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java @@ -19,8 +19,6 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseDoubleColumnValueSelector; import java.nio.ByteBuffer; @@ -35,10 +33,6 @@ public final class DoubleBloomFilterBufferAggregator extends BaseBloomFilterBuff @Override public void bufferAdd(ByteBuffer buf) { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addDouble(buf, selector.getDouble()); - } else { - BloomKFilter.addBytes(buf, null, 0, 0); - } + BaseBloomFilterBufferAggregator.bufferAddDouble(buf, selector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java index ae53d165b96e..17dfe60ea9f7 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java @@ -19,24 +19,18 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseFloatColumnValueSelector; public final class FloatBloomFilterAggregator extends BaseBloomFilterAggregator { - FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, BloomKFilter collector) + FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries) { - super(selector, collector); + super(selector, maxNumEntries); } @Override public void aggregate() { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.addFloat(selector.getFloat()); - } else { - collector.addBytes(null, 0, 0); - } + BaseBloomFilterBufferAggregator.bufferAddFloat(collector, selector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java index caa47397df11..8d6b934f7c35 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java @@ -19,24 +19,18 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseLongColumnValueSelector; public final class LongBloomFilterAggregator extends BaseBloomFilterAggregator { - LongBloomFilterAggregator(BaseLongColumnValueSelector selector, BloomKFilter collector) + LongBloomFilterAggregator(BaseLongColumnValueSelector selector, int maxNumEntries) { - super(selector, collector); + super(selector, maxNumEntries); } @Override public void aggregate() { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.addLong(selector.getLong()); - } else { - collector.addBytes(null, 0, 0); - } + BaseBloomFilterBufferAggregator.bufferAddLong(collector, selector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java index 13a6634cda10..04c34fecb8bd 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java @@ -19,8 +19,6 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseLongColumnValueSelector; import java.nio.ByteBuffer; @@ -35,10 +33,6 @@ public final class LongBloomFilterBufferAggregator extends BaseBloomFilterBuffer @Override public void bufferAdd(ByteBuffer buf) { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addLong(buf, selector.getLong()); - } else { - BloomKFilter.addBytes(buf, null, 0, 0); - } + BaseBloomFilterBufferAggregator.bufferAddLong(buf, selector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java index bc4d429a446d..928747adf2e7 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java @@ -26,7 +26,7 @@ public final class NoopBloomFilterAggregator extends BaseBloomFilterAggregator { - StringBloomFilterAggregator(DimensionSelector selector, BloomKFilter collector) + StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries) { - super(selector, collector); + super(selector, maxNumEntries); } @Override public void aggregate() { - // note: there might be room for optimization here but behavior must match BloomDimFilter implementation - if (selector.getRow().size() > 1) { - selector.getRow().forEach(v -> { - String value = selector.lookupName(v); - if (value == null) { - collector.addBytes(null, 0, 0); - } else { - collector.addString(value); - } - }); - } else { - String value = (String) selector.getObject(); - if (value == null) { - collector.addBytes(null, 0, 0); - } else { - collector.addString(value); - } - } + BaseBloomFilterBufferAggregator.bufferAddDim(collector, selector); } } diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java index 790cf8c5c01d..21587a221d9b 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java @@ -241,7 +241,7 @@ public BloomFilterAggregatorTest() public void testAggregateValues() throws IOException { DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); - StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, new BloomKFilter(maxNumValues)); + StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues); for (int i = 0; i < values1.size(); ++i) { aggregateDimension(Collections.singletonList(dimSelector), agg); @@ -256,7 +256,7 @@ public void testAggregateValues() throws IOException public void testAggregateLongValues() throws IOException { TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1)); - LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues); for (Long ignored : longValues1) { aggregateColumn(Collections.singletonList(selector), agg); @@ -271,7 +271,7 @@ public void testAggregateLongValues() throws IOException public void testAggregateFloatValues() throws IOException { TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1)); - FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues); for (Float ignored : floatValues1) { aggregateColumn(Collections.singletonList(selector), agg); @@ -286,7 +286,7 @@ public void testAggregateFloatValues() throws IOException public void testAggregateDoubleValues() throws IOException { TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1)); - DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues); for (Double ignored : doubleValues1) { aggregateColumn(Collections.singletonList(selector), agg); @@ -384,8 +384,8 @@ public void testCombineValues() throws IOException DimensionSelector dimSelector1 = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); DimensionSelector dimSelector2 = new CardinalityAggregatorTest.TestDimensionSelector(values2, null); - StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, new BloomKFilter(maxNumValues)); - StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, new BloomKFilter(maxNumValues)); + StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, maxNumValues); + StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, maxNumValues); for (int i = 0; i < values1.size(); ++i) { aggregateDimension(Collections.singletonList(dimSelector1), agg1); @@ -409,10 +409,15 @@ public void testCombineValues() throws IOException public void testMergeValues() throws IOException { final TestBloomFilterColumnSelector mergeDim = - new TestBloomFilterColumnSelector(ImmutableList.of(filter1, filter2)); + new TestBloomFilterColumnSelector( + ImmutableList.of( + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)), + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2)) + ) + ); BloomFilterMergeAggregator mergeAggregator = - new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues)); + new BloomFilterMergeAggregator(mergeDim, maxNumValues); for (int i = 0; i < 2; ++i) { aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); @@ -437,7 +442,7 @@ public void testMergeValuesWithBuffersForGroupByV1() throws IOException ); BloomFilterMergeAggregator mergeAggregator = - new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues)); + new BloomFilterMergeAggregator(mergeDim, maxNumValues); for (int i = 0; i < 2; ++i) { aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java index ad4f90acf6f1..f8f648952d5c 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java @@ -179,7 +179,6 @@ public void setUp() throws Exception .rows(CalciteTests.ROWS1_WITH_NUMERIC_DIMS) .buildMMappedIndex(); - walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() .dataSource(DATA_SOURCE) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 2c09568d3b1d..8e8934a3a166 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -424,8 +424,6 @@ public AuthenticationResult createEscalatedAuthenticationResult() ) ); - - public static final List ROWS2 = ImmutableList.of( createRow("2000-01-01", "דרואיד", "he", 1.0), createRow("2000-01-01", "druid", "en", 1.0), From ab5e6080c135d0af39fea15cd038b2cead433bf6 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Apr 2019 21:24:09 -0700 Subject: [PATCH 02/10] there can be only 1 --- .../bloom/BaseBloomFilterAggregator.java | 103 +++++++++++- .../BaseBloomFilterBufferAggregator.java | 159 ------------------ .../bloom/BloomFilterAggregatorFactory.java | 53 ++++-- .../bloom/BloomFilterMergeAggregator.java | 20 ++- .../BloomFilterMergeAggregatorFactory.java | 24 +-- .../BloomFilterMergeBufferAggregator.java | 40 ----- .../bloom/DoubleBloomFilterAggregator.java | 20 ++- .../DoubleBloomFilterBufferAggregator.java | 38 ----- .../bloom/FloatBloomFilterAggregator.java | 20 ++- .../FloatBloomFilterBufferAggregator.java | 44 ----- .../bloom/LongBloomFilterAggregator.java | 20 ++- .../LongBloomFilterBufferAggregator.java | 38 ----- .../bloom/NoopBloomFilterAggregator.java | 19 ++- .../NoopBloomFilterBufferAggregator.java | 44 ----- .../bloom/StringBloomFilterAggregator.java | 32 +++- .../StringBloomFilterBufferAggregator.java | 56 ------ .../bloom/BloomFilterAggregatorTest.java | 34 ++-- 17 files changed, 269 insertions(+), 495 deletions(-) delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterBufferAggregator.java delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 9f22d537c0a8..d501588a5ec3 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -20,25 +20,89 @@ package org.apache.druid.query.aggregation.bloom; import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; -public abstract class BaseBloomFilterAggregator implements Aggregator +public abstract class BaseBloomFilterAggregator + implements BufferAggregator, Aggregator { + + protected final ByteBuffer collector; protected final int maxNumEntries; protected final TSelector selector; - final ByteBuffer collector; - BaseBloomFilterAggregator(TSelector selector, int maxNumEntries) + BaseBloomFilterAggregator(TSelector selector, int maxNumEntries, boolean onHeap) { this.selector = selector; this.maxNumEntries = maxNumEntries; - BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries); - this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries)); - BloomKFilter.serialize(collector, bloomFilter); + if (onHeap) { + BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries); + this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries)); + BloomKFilter.serialize(collector, bloomFilter); + } else { + collector = null; + } + } + + abstract void bufferAdd(ByteBuffer buf); + + @Override + public void init(ByteBuffer buf, int position) + { + final ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + BloomKFilter filter = new BloomKFilter(maxNumEntries); + BloomKFilter.serialize(mutationBuffer, filter); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + final int oldPosition = buf.position(); + buf.position(position); + bufferAdd(buf); + buf.position(oldPosition); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + // | k (byte) | numLongs (int) | bitset (long[numLongs]) | + int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries); + mutationBuffer.limit(position + sizeBytes); + + ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); + resultCopy.put(mutationBuffer.slice()); + resultCopy.rewind(); + return resultCopy; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); } @Nullable @@ -51,19 +115,19 @@ public Object get() @Override public float getFloat() { - throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()"); + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()"); } @Override public long getLong() { - throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()"); + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()"); } @Override public double getDouble() { - throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()"); + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); } @Override @@ -71,4 +135,25 @@ public void close() { // nothing to close } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } + + static void bufferAddFloat(ByteBuffer buffer, BaseFloatColumnValueSelector selector) + { + + } + + static void bufferAddLong(ByteBuffer buffer, BaseLongColumnValueSelector selector) + { + + } + + static void bufferAddDimension(ByteBuffer buffer, DimensionSelector selector) + { + + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java deleted file mode 100644 index 2f0184c4f4bc..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; -import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; -import org.apache.druid.segment.DimensionSelector; - -import java.nio.ByteBuffer; - -public abstract class BaseBloomFilterBufferAggregator - implements BufferAggregator -{ - protected final int maxNumEntries; - protected final TSelector selector; - - BaseBloomFilterBufferAggregator(TSelector selector, int maxNumEntries) - { - this.selector = selector; - this.maxNumEntries = maxNumEntries; - } - - abstract void bufferAdd(ByteBuffer buf); - - @Override - public void init(ByteBuffer buf, int position) - { - final ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - BloomKFilter filter = new BloomKFilter(maxNumEntries); - BloomKFilter.serialize(mutationBuffer, filter); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - final int oldPosition = buf.position(); - buf.position(position); - bufferAdd(buf); - buf.position(oldPosition); - } - - - @Override - public Object get(ByteBuffer buf, int position) - { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - // | k (byte) | numLongs (int) | bitset (long[numLongs]) | - int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries); - mutationBuffer.limit(position + sizeBytes); - - ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); - resultCopy.put(mutationBuffer.slice()); - resultCopy.rewind(); - return resultCopy; - } - - @Override - public float getFloat(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()"); - } - - @Override - public long getLong(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()"); - } - - @Override - public double getDouble(ByteBuffer buf, int position) - { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); - } - - @Override - public void close() - { - // nothing to close - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - - public static void bufferAddFloat(ByteBuffer buffer, BaseFloatColumnValueSelector selector) - { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addFloat(buffer, selector.getFloat()); - } else { - BloomKFilter.addBytes(buffer, null, 0, 0); - } - } - - public static void bufferAddLong(ByteBuffer buffer, BaseLongColumnValueSelector selector) - { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addLong(buffer, selector.getLong()); - } else { - BloomKFilter.addBytes(buffer, null, 0, 0); - } - } - - public static void bufferAddDouble(ByteBuffer buffer, BaseDoubleColumnValueSelector selector) - { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addDouble(buffer, selector.getDouble()); - } else { - BloomKFilter.addBytes(buffer, null, 0, 0); - } - } - - public static void bufferAddDim(ByteBuffer buffer, DimensionSelector selector) - { - if (selector.getRow().size() > 1) { - selector.getRow().forEach(v -> { - String value = selector.lookupName(v); - if (value == null) { - BloomKFilter.addBytes(buffer, null, 0, 0); - } else { - BloomKFilter.addString(buffer, value); - } - }); - } else { - String value = (String) selector.getObject(); - if (value == null) { - BloomKFilter.addBytes(buffer, null, 0, 0); - } else { - BloomKFilter.addString(buffer, value); - } - } - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index 3f10b8a9ee3d..318ab6c19f8a 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -83,14 +83,13 @@ public BloomFilterAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory columnFactory) { - BloomKFilter filter = new BloomKFilter(maxNumEntries); ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension()); if (capabilities == null) { BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); if (selector instanceof NilColumnValueSelector) { // BloomKFilter must be the same size so we cannot use a constant for the empty agg - return new NoopBloomFilterAggregator(filter); + return new NoopBloomFilterAggregator(maxNumEntries, true); } throw new IAE( "Cannot create bloom filter buffer aggregator for column selector type [%s]", @@ -100,13 +99,29 @@ public Aggregator factorize(ColumnSelectorFactory columnFactory) ValueType type = capabilities.getType(); switch (type) { case STRING: - return new StringBloomFilterAggregator(columnFactory.makeDimensionSelector(field), maxNumEntries); + return new StringBloomFilterAggregator( + columnFactory.makeDimensionSelector(field), + maxNumEntries, + true + ); case LONG: - return new LongBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries); + return new LongBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + true + ); case FLOAT: - return new FloatBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries); + return new FloatBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + true + ); case DOUBLE: - return new DoubleBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries); + return new DoubleBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + true + ); default: throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type); } @@ -120,7 +135,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) if (capabilities == null) { BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); if (selector instanceof NilColumnValueSelector) { - return new NoopBloomFilterBufferAggregator(maxNumEntries); + return new NoopBloomFilterAggregator(maxNumEntries, false); } throw new IAE( "Cannot create bloom filter buffer aggregator for column selector type [%s]", @@ -131,18 +146,28 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) ValueType type = capabilities.getType(); switch (type) { case STRING: - return new StringBloomFilterBufferAggregator(columnFactory.makeDimensionSelector(field), maxNumEntries); + return new StringBloomFilterAggregator( + columnFactory.makeDimensionSelector(field), + maxNumEntries, + false + ); case LONG: - return new LongBloomFilterBufferAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + return new LongBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + false ); case FLOAT: - return new FloatBloomFilterBufferAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + return new FloatBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + false ); case DOUBLE: - return new DoubleBloomFilterBufferAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + return new DoubleBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + false ); default: throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type); diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java index d2574ee4f966..aac85b023bb4 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -24,21 +24,23 @@ import java.nio.ByteBuffer; -public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator> +public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator> { - public BloomFilterMergeAggregator(ColumnValueSelector selector, int maxNumEntries) + public BloomFilterMergeAggregator(ColumnValueSelector selector, int maxNumEntries, boolean onHeap) { - super(selector, maxNumEntries); + super(selector, maxNumEntries, onHeap); + } + + @Override + public void bufferAdd(ByteBuffer buf) + { + ByteBuffer other = selector.getObject(); + BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); } @Override public void aggregate() { - Object other = selector.getObject(); - if (other != null) { - if (other instanceof ByteBuffer) { - BloomKFilter.mergeBloomFilterByteBuffers(collector, 0, (ByteBuffer) other, 0); - } - } + bufferAdd(collector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java index 90287495f343..ed5ce2904654 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java @@ -47,23 +47,13 @@ public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFact @Override public Aggregator factorize(final ColumnSelectorFactory metricFactory) { - final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected - if (selector instanceof NilColumnValueSelector) { - throw new ISE("WTF?! Unexpected NilColumnValueSelector"); - } - return new BloomFilterMergeAggregator((ColumnValueSelector) selector, getMaxNumEntries()); + return makeMergeAggregator(metricFactory); } @Override public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) { - final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected - if (selector instanceof NilColumnValueSelector) { - throw new ISE("WTF?! Unexpected NilColumnValueSelector"); - } - return new BloomFilterMergeBufferAggregator((ColumnValueSelector) selector, getMaxNumEntries()); + return makeMergeAggregator(metricFactory); } @Override @@ -80,4 +70,14 @@ public byte[] getCacheKey() .appendInt(getMaxNumEntries()) .build(); } + + private BloomFilterMergeAggregator makeMergeAggregator(ColumnSelectorFactory metricFactory) + { + final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected + if (selector instanceof NilColumnValueSelector) { + throw new ISE("WTF?! Unexpected NilColumnValueSelector"); + } + return new BloomFilterMergeAggregator((ColumnValueSelector) selector, getMaxNumEntries(), true); + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java deleted file mode 100644 index 026a23e7285d..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class BloomFilterMergeBufferAggregator extends BaseBloomFilterBufferAggregator> -{ - public BloomFilterMergeBufferAggregator(ColumnValueSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - ByteBuffer other = selector.getObject(); - BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java index 84d72baf3cf6..ad2f7e74ac77 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java @@ -19,18 +19,32 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import java.nio.ByteBuffer; + public final class DoubleBloomFilterAggregator extends BaseBloomFilterAggregator { - DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries) + DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries, boolean onHeap) + { + super(selector, maxNumEntries, onHeap); + } + + @Override + public void bufferAdd(ByteBuffer buf) { - super(selector, maxNumEntries); + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addDouble(buf, selector.getDouble()); + } else { + BloomKFilter.addBytes(buf, null, 0, 0); + } } @Override public void aggregate() { - BaseBloomFilterBufferAggregator.bufferAddDouble(collector, selector); + bufferAdd(collector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java deleted file mode 100644 index 0e5aeca36f17..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.segment.BaseDoubleColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class DoubleBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator -{ - DoubleBloomFilterBufferAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - BaseBloomFilterBufferAggregator.bufferAddDouble(buf, selector); - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java index 17dfe60ea9f7..b3a91591e939 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java @@ -19,18 +19,32 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseFloatColumnValueSelector; +import java.nio.ByteBuffer; + public final class FloatBloomFilterAggregator extends BaseBloomFilterAggregator { - FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries) + FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries, boolean onHeap) + { + super(selector, maxNumEntries, onHeap); + } + + @Override + public void bufferAdd(ByteBuffer buf) { - super(selector, maxNumEntries); + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addFloat(buf, selector.getFloat()); + } else { + BloomKFilter.addBytes(buf, null, 0, 0); + } } @Override public void aggregate() { - BaseBloomFilterBufferAggregator.bufferAddFloat(collector, selector); + bufferAdd(collector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java deleted file mode 100644 index 27e88d48d7a9..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.BaseFloatColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class FloatBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator -{ - FloatBloomFilterBufferAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - BloomKFilter.addFloat(buf, selector.getFloat()); - } else { - BloomKFilter.addBytes(buf, null, 0, 0); - } - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java index 8d6b934f7c35..c6d61dfa15bd 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java @@ -19,18 +19,32 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseLongColumnValueSelector; +import java.nio.ByteBuffer; + public final class LongBloomFilterAggregator extends BaseBloomFilterAggregator { - LongBloomFilterAggregator(BaseLongColumnValueSelector selector, int maxNumEntries) + LongBloomFilterAggregator(BaseLongColumnValueSelector selector, int maxNumEntries, boolean onHeap) + { + super(selector, maxNumEntries, onHeap); + } + + @Override + public void bufferAdd(ByteBuffer buf) { - super(selector, maxNumEntries); + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addLong(buf, selector.getLong()); + } else { + BloomKFilter.addBytes(buf, null, 0, 0); + } } @Override public void aggregate() { - BaseBloomFilterBufferAggregator.bufferAddLong(collector, selector); + bufferAdd(collector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java deleted file mode 100644 index 04c34fecb8bd..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.segment.BaseLongColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class LongBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator -{ - LongBloomFilterBufferAggregator(BaseLongColumnValueSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - BaseBloomFilterBufferAggregator.bufferAddLong(buf, selector); - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java index 928747adf2e7..ec23df5e780e 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterAggregator.java @@ -19,14 +19,27 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.NilColumnValueSelector; +import java.nio.ByteBuffer; + public final class NoopBloomFilterAggregator extends BaseBloomFilterAggregator { - NoopBloomFilterAggregator(BloomKFilter collector) + NoopBloomFilterAggregator(int maxNumEntries, boolean onHeap) + { + super(NilColumnValueSelector.instance(), maxNumEntries, onHeap); + } + + @Override + public void bufferAdd(ByteBuffer buf) { - super(NilColumnValueSelector.instance(), 1); + // nothing to do + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + // nothing to do } @Override diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterBufferAggregator.java deleted file mode 100644 index 6a71d4c5a3de..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/NoopBloomFilterBufferAggregator.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.segment.NilColumnValueSelector; - -import java.nio.ByteBuffer; - -public final class NoopBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator -{ - NoopBloomFilterBufferAggregator(int maxNumEntries) - { - super(NilColumnValueSelector.instance(), maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - // nothing to do - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - // nothing to do - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java index 5f47241f819f..b2da96b9541f 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java @@ -19,18 +19,44 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.DimensionSelector; +import java.nio.ByteBuffer; + public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator { - StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries) + + StringBloomFilterAggregator(DimensionSelector selector, int maxNumEntries, boolean onHeap) + { + super(selector, maxNumEntries, onHeap); + } + + @Override + public void bufferAdd(ByteBuffer buf) { - super(selector, maxNumEntries); + if (selector.getRow().size() > 1) { + selector.getRow().forEach(v -> { + String value = selector.lookupName(v); + if (value == null) { + BloomKFilter.addBytes(buf, null, 0, 0); + } else { + BloomKFilter.addString(buf, value); + } + }); + } else { + String value = (String) selector.getObject(); + if (value == null) { + BloomKFilter.addBytes(buf, null, 0, 0); + } else { + BloomKFilter.addString(buf, value); + } + } } @Override public void aggregate() { - BaseBloomFilterBufferAggregator.bufferAddDim(collector, selector); + bufferAdd(collector); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java deleted file mode 100644 index c7c17c940e0d..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.DimensionSelector; - -import java.nio.ByteBuffer; - -public final class StringBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator -{ - - StringBloomFilterBufferAggregator(DimensionSelector selector, int maxNumEntries) - { - super(selector, maxNumEntries); - } - - @Override - public void bufferAdd(ByteBuffer buf) - { - if (selector.getRow().size() > 1) { - selector.getRow().forEach(v -> { - String value = selector.lookupName(v); - if (value == null) { - BloomKFilter.addBytes(buf, null, 0, 0); - } else { - BloomKFilter.addString(buf, value); - } - }); - } else { - String value = (String) selector.getObject(); - if (value == null) { - BloomKFilter.addBytes(buf, null, 0, 0); - } else { - BloomKFilter.addString(buf, value); - } - } - } -} diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java index 21587a221d9b..879f3292766a 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java @@ -241,7 +241,7 @@ public BloomFilterAggregatorTest() public void testAggregateValues() throws IOException { DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); - StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues); + StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true); for (int i = 0; i < values1.size(); ++i) { aggregateDimension(Collections.singletonList(dimSelector), agg); @@ -256,7 +256,7 @@ public void testAggregateValues() throws IOException public void testAggregateLongValues() throws IOException { TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1)); - LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues); + LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true); for (Long ignored : longValues1) { aggregateColumn(Collections.singletonList(selector), agg); @@ -271,7 +271,7 @@ public void testAggregateLongValues() throws IOException public void testAggregateFloatValues() throws IOException { TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1)); - FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues); + FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true); for (Float ignored : floatValues1) { aggregateColumn(Collections.singletonList(selector), agg); @@ -286,7 +286,7 @@ public void testAggregateFloatValues() throws IOException public void testAggregateDoubleValues() throws IOException { TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1)); - DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues); + DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true); for (Double ignored : doubleValues1) { aggregateColumn(Collections.singletonList(selector), agg); @@ -301,7 +301,7 @@ public void testAggregateDoubleValues() throws IOException public void testBufferAggregateStringValues() throws IOException { DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values2, null); - StringBloomFilterBufferAggregator agg = new StringBloomFilterBufferAggregator(dimSelector, maxNumValues); + StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -322,7 +322,7 @@ public void testBufferAggregateStringValues() throws IOException public void testBufferAggregateLongValues() throws IOException { TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1)); - LongBloomFilterBufferAggregator agg = new LongBloomFilterBufferAggregator(selector, maxNumValues); + LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -342,7 +342,7 @@ public void testBufferAggregateLongValues() throws IOException public void testBufferAggregateFloatValues() throws IOException { TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1)); - FloatBloomFilterBufferAggregator agg = new FloatBloomFilterBufferAggregator(selector, maxNumValues); + FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -362,7 +362,7 @@ public void testBufferAggregateFloatValues() throws IOException public void testBufferAggregateDoubleValues() throws IOException { TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1)); - DoubleBloomFilterBufferAggregator agg = new DoubleBloomFilterBufferAggregator(selector, maxNumValues); + DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, maxNumValues, true); int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); @@ -384,8 +384,8 @@ public void testCombineValues() throws IOException DimensionSelector dimSelector1 = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); DimensionSelector dimSelector2 = new CardinalityAggregatorTest.TestDimensionSelector(values2, null); - StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, maxNumValues); - StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, maxNumValues); + StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, maxNumValues, true); + StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, maxNumValues, true); for (int i = 0; i < values1.size(); ++i) { aggregateDimension(Collections.singletonList(dimSelector1), agg1); @@ -408,8 +408,8 @@ public void testCombineValues() throws IOException @Test public void testMergeValues() throws IOException { - final TestBloomFilterColumnSelector mergeDim = - new TestBloomFilterColumnSelector( + final TestBloomFilterBufferColumnSelector mergeDim = + new TestBloomFilterBufferColumnSelector( ImmutableList.of( ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)), ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2)) @@ -417,7 +417,7 @@ public void testMergeValues() throws IOException ); BloomFilterMergeAggregator mergeAggregator = - new BloomFilterMergeAggregator(mergeDim, maxNumValues); + new BloomFilterMergeAggregator(mergeDim, maxNumValues, true); for (int i = 0; i < 2; ++i) { aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); @@ -433,8 +433,8 @@ public void testMergeValues() throws IOException @Test public void testMergeValuesWithBuffersForGroupByV1() throws IOException { - final TestBloomFilterColumnSelector mergeDim = - new TestBloomFilterColumnSelector( + final TestBloomFilterBufferColumnSelector mergeDim = + new TestBloomFilterBufferColumnSelector( ImmutableList.of( ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)), ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2)) @@ -442,7 +442,7 @@ public void testMergeValuesWithBuffersForGroupByV1() throws IOException ); BloomFilterMergeAggregator mergeAggregator = - new BloomFilterMergeAggregator(mergeDim, maxNumValues); + new BloomFilterMergeAggregator(mergeDim, maxNumValues, true); for (int i = 0; i < 2; ++i) { aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); @@ -466,7 +466,7 @@ public void testBuferMergeValues() throws IOException ) ); - BloomFilterMergeBufferAggregator mergeAggregator = new BloomFilterMergeBufferAggregator(mergeDim, maxNumValues); + BloomFilterMergeAggregator mergeAggregator = new BloomFilterMergeAggregator(mergeDim, maxNumValues, false); int maxSize = valueAggregatorFactory.getCombiningFactory().getMaxIntermediateSizeWithNulls(); ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); From fea8d832c8871a344f0c5056e0e47d927217a039 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Apr 2019 22:04:35 -0700 Subject: [PATCH 03/10] simplify --- .../bloom/BaseBloomFilterAggregator.java | 6 ++ .../bloom/BloomFilterAggregatorFactory.java | 13 +---- .../bloom/BloomFilterMergeAggregator.java | 11 ++-- .../bloom/DoubleBloomFilterAggregator.java | 6 -- .../bloom/FloatBloomFilterAggregator.java | 6 -- .../bloom/LongBloomFilterAggregator.java | 6 -- .../bloom/StringBloomFilterAggregator.java | 6 -- .../bloom/BloomFilterAggregatorTest.java | 57 +++++++++++++------ .../bloom/BloomFilterGroupByQueryTest.java | 10 ++-- 9 files changed, 57 insertions(+), 64 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index d501588a5ec3..5a5403c4a61f 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -105,6 +105,12 @@ public double getDouble(ByteBuffer buf, int position) throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); } + @Override + public void aggregate() + { + aggregate(collector, 0); + } + @Nullable @Override public Object get() diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index 318ab6c19f8a..72eadeb20a61 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -231,18 +231,7 @@ public Object deserialize(Object object) @Override public Object finalizeComputation(Object object) { - try { - if (object instanceof ByteBuffer) { - return BloomKFilter.deserialize((ByteBuffer) object); - } else if (object instanceof byte[]) { - return BloomKFilter.deserialize(ByteBuffer.wrap((byte[]) object)); - } else { - return object; - } - } - catch (IOException ioe) { - throw new RuntimeException("Failed to deserialize BloomKFilter", ioe); - } + return object; } @JsonProperty diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java index aac85b023bb4..011f2f6c563d 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.ColumnValueSelector; @@ -35,12 +36,10 @@ public BloomFilterMergeAggregator(ColumnValueSelector selector, int public void bufferAdd(ByteBuffer buf) { ByteBuffer other = selector.getObject(); + if (other == null) { + // nulls should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected + throw new ISE("WTF?! Unexpected null value in BloomFilterMergeAggregator"); + } BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); } - - @Override - public void aggregate() - { - bufferAdd(collector); - } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java index ad2f7e74ac77..8aa899e1633d 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java @@ -41,10 +41,4 @@ public void bufferAdd(ByteBuffer buf) BloomKFilter.addBytes(buf, null, 0, 0); } } - - @Override - public void aggregate() - { - bufferAdd(collector); - } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java index b3a91591e939..0a7c042144a9 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java @@ -41,10 +41,4 @@ public void bufferAdd(ByteBuffer buf) BloomKFilter.addBytes(buf, null, 0, 0); } } - - @Override - public void aggregate() - { - bufferAdd(collector); - } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java index c6d61dfa15bd..3e232ce5082b 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java @@ -41,10 +41,4 @@ public void bufferAdd(ByteBuffer buf) BloomKFilter.addBytes(buf, null, 0, 0); } } - - @Override - public void aggregate() - { - bufferAdd(collector); - } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java index b2da96b9541f..f3f6daec75ef 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java @@ -53,10 +53,4 @@ public void bufferAdd(ByteBuffer buf) } } } - - @Override - public void aggregate() - { - bufferAdd(collector); - } } diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java index 879f3292766a..63422f4bdd73 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java @@ -247,7 +247,9 @@ public void testAggregateValues() throws IOException aggregateDimension(Collections.singletonList(dimSelector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFilter1, serialized); } @@ -262,7 +264,9 @@ public void testAggregateLongValues() throws IOException aggregateColumn(Collections.singletonList(selector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedLongFilter, serialized); } @@ -277,7 +281,9 @@ public void testAggregateFloatValues() throws IOException aggregateColumn(Collections.singletonList(selector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFloatFilter, serialized); } @@ -292,7 +298,9 @@ public void testAggregateDoubleValues() throws IOException aggregateColumn(Collections.singletonList(selector), agg); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedDoubleFilter, serialized); } @@ -313,7 +321,9 @@ public void testBufferAggregateStringValues() throws IOException for (int i = 0; i < values2.size(); ++i) { bufferAggregateDimension(Collections.singletonList(dimSelector), agg, buf, pos); } - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFilter2, serialized); } @@ -333,7 +343,9 @@ public void testBufferAggregateLongValues() throws IOException IntStream.range(0, longValues1.length) .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedLongFilter, serialized); } @@ -353,7 +365,9 @@ public void testBufferAggregateFloatValues() throws IOException IntStream.range(0, floatValues1.length) .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedFloatFilter, serialized); } @@ -373,7 +387,9 @@ public void testBufferAggregateDoubleValues() throws IOException IntStream.range(0, doubleValues1.length) .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); - BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); String serialized = filterToString(bloomKFilter); Assert.assertEquals(serializedDoubleFilter, serialized); } @@ -394,10 +410,12 @@ public void testCombineValues() throws IOException aggregateDimension(Collections.singletonList(dimSelector2), agg2); } - BloomKFilter combined = (BloomKFilter) valueAggregatorFactory.finalizeComputation( - valueAggregatorFactory.combine( - agg1.get(), - agg2.get() + BloomKFilter combined = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation( + valueAggregatorFactory.combine( + agg1.get(), + agg2.get() + ) ) ); @@ -424,8 +442,9 @@ public void testMergeValues() throws IOException } - BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() - .finalizeComputation(mergeAggregator.get()); + BloomKFilter merged = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get()) + ); String serialized = filterToString(merged); Assert.assertEquals(serializedCombinedFilter, serialized); } @@ -449,8 +468,9 @@ public void testMergeValuesWithBuffersForGroupByV1() throws IOException } - BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() - .finalizeComputation(mergeAggregator.get()); + BloomKFilter merged = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get()) + ); String serialized = filterToString(merged); Assert.assertEquals(serializedCombinedFilter, serialized); } @@ -479,8 +499,9 @@ public void testBuferMergeValues() throws IOException bufferAggregateColumn(Collections.singletonList(mergeDim), mergeAggregator, buf, pos); } - BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() - .finalizeComputation(mergeAggregator.get(buf, pos)); + BloomKFilter merged = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.getCombiningFactory().finalizeComputation(mergeAggregator.get(buf, pos)) + ); String serialized = filterToString(merged); Assert.assertEquals(serializedCombinedFilter, serialized); diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java index a2207f2e6423..ce3b9322d743 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java @@ -42,6 +42,7 @@ import org.junit.runners.Parameterized; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -108,9 +109,10 @@ public void testQuery() throws Exception MapBasedRow row = ingestAndQuery(query); - Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("mezzanine")); - Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("premium")); - Assert.assertFalse(((BloomKFilter) row.getRaw("blooming_quality")).testString("entertainment")); + BloomKFilter filter = BloomKFilter.deserialize((ByteBuffer) row.getRaw("blooming_quality")); + Assert.assertTrue(filter.testString("mezzanine")); + Assert.assertTrue(filter.testString("premium")); + Assert.assertFalse(filter.testString("entertainment")); } @Test @@ -135,7 +137,7 @@ public void testQueryFakeDimension() throws Exception Object val = row.getRaw("blooming_quality"); - String serialized = BloomFilterAggregatorTest.filterToString((BloomKFilter) val); + String serialized = BloomFilterAggregatorTest.filterToString(BloomKFilter.deserialize((ByteBuffer) val)); String empty = BloomFilterAggregatorTest.filterToString(filter); Assert.assertEquals(empty, serialized); From bddd4fd399ba9edf663a36edcae069ae6c68574f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Apr 2019 22:22:50 -0700 Subject: [PATCH 04/10] javadoc --- .../bloom/BaseBloomFilterAggregator.java | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 5a5403c4a61f..732c610e057b 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -26,11 +26,31 @@ import org.apache.druid.segment.BaseFloatColumnValueSelector; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.DimensionSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; +/** + * All bloom filter aggregations are done using {@link ByteBuffer}, so this base class implements both {@link Aggregator} + * and {@link BufferAggregator}. + * + * If used as an {@link Aggregator} the caller MUST specify the 'onHeap' parameter in the + * constructor as "true", or else the "collector" will not be allocated and null pointer exceptions will make things sad. + * + * If used as a {@link BufferAggregator}, the "collector" buffer is not necessary, and should be called with "false", + * but at least nothing dramatic will happen like incorrect use in the {@link Aggregator} case. + * + * {@link BloomFilterAggregatorFactory} and {@link BloomFilterMergeAggregatorFactory}, which should be the creators of + * all implementations of {@link BaseBloomFilterAggregator} outside of tests, should be sure to set the 'onHeap' value + * to "true" and "false" respectively for + * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize(ColumnSelectorFactory)} and + * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered(ColumnSelectorFactory)} + * + * @param type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values + * to add to a bloom filter, or other bloom filters to merge into this bloom filter. + */ public abstract class BaseBloomFilterAggregator implements BufferAggregator, Aggregator { @@ -39,6 +59,11 @@ public abstract class BaseBloomFilterAggregator Date: Tue, 16 Apr 2019 23:07:11 -0700 Subject: [PATCH 05/10] clean up unused test method --- .../aggregation/bloom/BloomFilterAggregatorTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java index 63422f4bdd73..da4479beca9a 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java @@ -622,14 +622,6 @@ public boolean isNull() } } - public static class TestBloomFilterColumnSelector extends SteppableSelector - { - public TestBloomFilterColumnSelector(List values) - { - super(values); - } - } - public static class TestBloomFilterBufferColumnSelector extends SteppableSelector { public TestBloomFilterBufferColumnSelector(List values) From f2ab542a54d63aedc3fd6fb91c3f4542497ca6da Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 16 Apr 2019 23:20:14 -0700 Subject: [PATCH 06/10] fix exception message --- .../aggregation/bloom/BaseBloomFilterAggregator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 732c610e057b..8a6d4319f89a 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -115,19 +115,19 @@ public Object get(ByteBuffer buf, int position) @Override public float getFloat(ByteBuffer buf, int position) { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()"); + throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()"); } @Override public long getLong(ByteBuffer buf, int position) { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()"); + throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()"); } @Override public double getDouble(ByteBuffer buf, int position) { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); + throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()"); } @Override @@ -146,19 +146,19 @@ public Object get() @Override public float getFloat() { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()"); + throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()"); } @Override public long getLong() { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()"); + throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()"); } @Override public double getDouble() { - throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); + throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()"); } @Override From f139852e6d5177635bb4d89dce350caba3ea8490 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 17 Apr 2019 00:20:22 -0700 Subject: [PATCH 07/10] style --- .../query/aggregation/bloom/BaseBloomFilterAggregator.java | 3 --- .../query/aggregation/bloom/BloomFilterAggregatorFactory.java | 3 +-- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 8a6d4319f89a..8389c1ebb350 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -23,11 +23,8 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; -import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.DimensionSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index 72eadeb20a61..972803559416 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -40,7 +40,6 @@ import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Comparator; @@ -223,7 +222,7 @@ public Object deserialize(Object object) return ByteBuffer.wrap(StringUtils.decodeBase64String((String) object)); } else if (object instanceof byte[]) { return ByteBuffer.wrap((byte[]) object); - } else { + } else { return object; } } From 0c7b1443d73a795bf14bae429de7adb163281543 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 17 Apr 2019 00:50:44 -0700 Subject: [PATCH 08/10] why does style hate javadocs --- .../query/aggregation/bloom/BaseBloomFilterAggregator.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 8389c1ebb350..9bf2a81f62b9 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -24,7 +24,6 @@ import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseNullableColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -42,8 +41,8 @@ * {@link BloomFilterAggregatorFactory} and {@link BloomFilterMergeAggregatorFactory}, which should be the creators of * all implementations of {@link BaseBloomFilterAggregator} outside of tests, should be sure to set the 'onHeap' value * to "true" and "false" respectively for - * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize(ColumnSelectorFactory)} and - * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered(ColumnSelectorFactory)} + * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and + * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered} * * @param type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values * to add to a bloom filter, or other bloom filters to merge into this bloom filter. From a7b25fabb76a6e15146986b619e32df7508b6a0a Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 17 Apr 2019 23:54:04 -0700 Subject: [PATCH 09/10] review stuff --- .../bloom/BaseBloomFilterAggregator.java | 14 ++-- .../bloom/BloomFilterAggregateCombiner.java | 77 ------------------- .../bloom/BloomFilterAggregatorFactory.java | 2 +- 3 files changed, 9 insertions(+), 84 deletions(-) delete mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 9bf2a81f62b9..da297186f913 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -50,8 +50,8 @@ public abstract class BaseBloomFilterAggregator implements BufferAggregator, Aggregator { - - protected final ByteBuffer collector; + @Nullable + private final ByteBuffer collector; protected final int maxNumEntries; protected final TSelector selector; @@ -88,9 +88,12 @@ public void init(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position) { final int oldPosition = buf.position(); - buf.position(position); - bufferAdd(buf); - buf.position(oldPosition); + try { + buf.position(position); + bufferAdd(buf); + } finally { + buf.position(oldPosition); + } } @Override @@ -98,7 +101,6 @@ public Object get(ByteBuffer buf, int position) { ByteBuffer mutationBuffer = buf.duplicate(); mutationBuffer.position(position); - // | k (byte) | numLongs (int) | bitset (long[numLongs]) | int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries); mutationBuffer.limit(position + sizeBytes); diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java deleted file mode 100644 index e5daddea42d8..000000000000 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.bloom; - -import org.apache.druid.query.aggregation.ObjectAggregateCombiner; -import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; - -import javax.annotation.Nullable; -import java.nio.ByteBuffer; - -public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner -{ - @Nullable - private ByteBuffer combined; - - private final int maxNumEntries; - - public BloomFilterAggregateCombiner(int maxNumEntries) - { - this.maxNumEntries = maxNumEntries; - } - - @Override - public void reset(ColumnValueSelector selector) - { - combined = null; - fold(selector); - } - - @Override - public void fold(ColumnValueSelector selector) - { - ByteBuffer other = (ByteBuffer) selector.getObject(); - if (other == null) { - return; - } - if (combined == null) { - BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries); - ByteBuffer buffer = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries)); - BloomKFilter.serialize(buffer, bloomFilter); - } - - BloomKFilter.mergeBloomFilterByteBuffers(combined, 0, other, 0); - } - - - @Nullable - @Override - public ByteBuffer getObject() - { - return combined; - } - - @Override - public Class classOfObject() - { - return ByteBuffer.class; - } -} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index 972803559416..74e921c3497e 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -200,7 +200,7 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - return new BloomFilterAggregateCombiner(maxNumEntries); + throw new UnsupportedOperationException("Bloom filter aggregators are query-time only"); } @Override From cf24b4ea1d3b6c3967c0b54c3fe4de523dac83c2 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 18 Apr 2019 09:05:51 -0700 Subject: [PATCH 10/10] style :( --- .../query/aggregation/bloom/BaseBloomFilterAggregator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index da297186f913..48ba08327537 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -91,7 +91,8 @@ public void aggregate(ByteBuffer buf, int position) try { buf.position(position); bufferAdd(buf); - } finally { + } + finally { buf.position(oldPosition); } }