Adds bloom filter aggregator to 'druid-bloom-filters' extension#6397
Adds bloom filter aggregator to 'druid-bloom-filters' extension#6397leventov merged 41 commits intoapache:masterfrom
Conversation
a4f9984 to
a75b278
Compare
There was a problem hiding this comment.
Consider making this valid JSON so it doesn't get syntax highlighted
There was a problem hiding this comment.
I believe that this is really only ugly on github and it looks ok translated to the website docs
There was a problem hiding this comment.
Consider making this valid JSON so it doesn't get syntax highlighted
There was a problem hiding this comment.
Refers to bloom aggregator here, but in the JSON spec the type is bloomFilter.
There was a problem hiding this comment.
bloom is the correct value to be consistent with the filter type name, updated docs to reflect that.
There was a problem hiding this comment.
I think it'd be worthwhile under maxNumEntries to discuss the implications of having more elements than the value provided here. Also, any discussion on how to choose an appropriate value here to get a given false-positive rate would also be helpful.
There was a problem hiding this comment.
Hmm, digging into it, in BloomKFilter the false positive rate is not controllable in the manner of BloomFilter, and is fixed to the default of 5%. However I guess that can be indirectly controlled by increasing the maxNumEntries, though that's kind of lame. Having a higher cardinality than the value of maxNumEntries will cause the false positive probability to reach 1, constructing a useless bloom filter, so that should definitely be added to the docs.
There was a problem hiding this comment.
Updated docs to include fixed 5% false positive rate, though no formula for how changing maxNumEntries affects that yet.
There was a problem hiding this comment.
Perhaps make these final and similarly for other classes
There was a problem hiding this comment.
There's quite a lot of duplicate or similar code between this and BloomFilterBufferAggregator. Any opportunity to consolidate?
There was a problem hiding this comment.
Consolidated common code into BaseBloomFilterAggregator and BaseBloomFilterBufferAggregator
There was a problem hiding this comment.
0x30 and 0x31 are already in use. Also position those variables in the end of the list.
There was a problem hiding this comment.
Oops, originally did this work in an older branch where these didn't exist yet I think, will fix.
There was a problem hiding this comment.
Could you make this kind of mistake impossible in the future by moving all codes in a enum with byte id; field, and adding a static initializer that checks that all enum constants have different codes?
Also CacheKeyBuilder's constructor to accept the enum constant instead of byte param to prohibit bypassing this enum
There was a problem hiding this comment.
Could avoid creating an array by iterating the row itself.
There was a problem hiding this comment.
DoubleColumnSelector must appear only in implements clauses and nowhere else. See it's Javadoc. Same for float and long.
There was a problem hiding this comment.
Please add comment "nothing to close"
There was a problem hiding this comment.
Please add a comment "nothing to close"
There was a problem hiding this comment.
Is it sure that "Aggregator" should be a part of the name of this class?
There was a problem hiding this comment.
I think it's not necessary, though I was just following cardinality aggregator.
There was a problem hiding this comment.
Cache the value in a static, don't create a throwaway each time.
There was a problem hiding this comment.
What sort of cache is most appropriate? A static int2object map? caffeine?
There was a problem hiding this comment.
"cache" = set to a static final field, I meant here
There was a problem hiding this comment.
I don't think a static works since the result depends on maxNumEntries which is a query time parameter. If this method gets called multiple times i can set an instance field to this value at constructor time, or I can see if the math that computes the size of the long array inside is accessible to call that directly.
There was a problem hiding this comment.
Direct computation, if not supported by the library directly (maybe we could contribute that?) could be dangerous if the algorithm in the library changes in some version.
I think most of the time only one maxNumEntries value will be seen per JVM, small Int2Object map (stopped being populated after say 10 entires) should work
There was a problem hiding this comment.
*Int2IntMap, you could cache the final values.
There was a problem hiding this comment.
Added a method to our copied BloomKFilter to compute the size required given a number of entries, avoiding this throwaway
There was a problem hiding this comment.
It's a header written during serialization I'll add a comment/link to https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java#L302 or see if I can find a better way to get this information
There was a problem hiding this comment.
Now uses new method on copied BloomKFilter to compute the size
There was a problem hiding this comment.
Could compare the number of set bits.
There was a problem hiding this comment.
Added method to copied BloomKFilter to count the number of set bits, allowing ordering results by density of bloom filter
There was a problem hiding this comment.
What does this variable name mean?
There was a problem hiding this comment.
Heh, oops, missed some clean up before opening the PR
fb270f6 to
03d6b9e
Compare
|
This PR is dependent on #6546 being resolved. |
03d6b9e to
bf44d3f
Compare
bf44d3f to
21eb78f
Compare
|
@dclim, @leventov (and anyone else interested) I think this PR is ready for review again. I think I've addressed all existing comments via code changes, apologies for the rebases - I jumped the gun a bit opening this before the rest of the bloom filter extension had stabilized. I've also added a bunch of methods to our copy of the Hive BloomKFilter to enable in situ manipulation of BloomKFilters that are serialized into a ByteBuffer, to allow much more memory efficient buffer aggs, which I will attempt to get pushed to the upstream implementation soon to avoid diverging too much. Additionally the documentation has been improved a bit for the extension, and have simplified the code where possible in response to comments. |
|
Any more comments on this PR @leventov ? |
|
@leventov would you mind if I merge this soon so I can unblock another branch I'm sitting on that adds sql support on top of this? If you find any additional issues I am happy to address them in a future patch. |
| @Override | ||
| public void inspectRuntimeShape(RuntimeShapeInspector inspector) | ||
| { | ||
| inspector.visit("selector", selectorPlus.getSelector()); |
There was a problem hiding this comment.
Apparently selectorPlus.getColumnSelectorStrategy() should be inspected too. Please, don't believe me and read the documentation of HotLoopCallee.inspectRuntimeShape() to verify that.
There was a problem hiding this comment.
After refactor all that is left is 'selector' which i think is all that needs inspected.
| collector.merge((BloomKFilter) other); | ||
| } else if (other instanceof ByteBuffer) { | ||
| // fun fact: because bloom filter agg factory deserialize returns a byte buffer to avoid unnecessary serde, | ||
| // but group by v1 ends up trying to merge bytebuffers from buffer aggs with this agg instead of the buffer |
There was a problem hiding this comment.
Please refer as GroupByQueryEngine (so-called groupBy V1) to ease navigation and for clarity.
| { | ||
| final ColumnValueSelector<BloomKFilter> selector = metricFactory.makeColumnValueSelector(fieldName); | ||
| if (selector instanceof NilColumnValueSelector) { | ||
| throw new ISE("WTF?! Unexpected NilColumnValueSelector"); |
There was a problem hiding this comment.
Why this? Many other object aggregators support absent columns.
There was a problem hiding this comment.
To expand on this comment, when examining how this would be called my conclusion was that this would be an unexpected condition in the merge aggregator, because the output of a null column from the aggregator would still be a bloom filter with the null or default value which is what this would see. Did I misinterpret or miss a situation where this could actually happen?
There was a problem hiding this comment.
I just noticed that this is the merge aggregator. Then it probably makes sense, but there should be a comment and the error message should be more descriptive.
But then it appears that BloomFilterAggregatorFactory doesn't special-case NilColumnValueSelector, however it probably should for performance.
There was a problem hiding this comment.
After refactor this now special cases NilColumnValueSelector and I think behaves in a slightly more correct manner of producing a totally empty bloom filter instead of a bloom filter with a null value
| import java.nio.ByteBuffer; | ||
|
|
||
| /** | ||
| * This exists so bloom filter agg has something to register so group by v1 will work, but isn't actually used |
There was a problem hiding this comment.
Please make "bloom filter agg" and "group by v1" Javadoc links.
| @Override | ||
| public ComplexMetricExtractor getExtractor() | ||
| { | ||
| throw new UnsupportedOperationException("How can this be?"); |
There was a problem hiding this comment.
A message like "Bloom filter aggregator is query-time only" might be more constructive.
|
|
||
| import java.nio.ByteBuffer; | ||
|
|
||
| public interface BloomFilterAggregatorColumnSelectorStrategy<TValueSelector> extends ColumnSelectorStrategy |
There was a problem hiding this comment.
It seems to me that BloomFilterAggregatorColumnSelectorStrategy and BloomFilterAggregatorColumnSelectorStrategyFactory are too shallow abstractions. Probably it's better to inline the logic of BloomFilterAggregatorColumnSelectorStrategyFactory into factorize() and factorizeBuffered() and the logic of BloomFilterAggregatorColumnSelectorStrategy into respective subclasses of BloomFilterBufferAggregator and BloomFilterAggregator.
There was a problem hiding this comment.
I used the same abstraction as CardinalityAggregator, should it be refactored in this manner as well? (not in this PR, but later at least)
To clarify you're suggesting something like having factorize produce a StringBloomFilterAggregator, LongBloomFilterAggregator, etc with the same for buffer aggs? I'll have a look at reworking it.
There was a problem hiding this comment.
Yes, exactly. I think this Strategy and StrategyFactory is too much and the logic is lost between endless classes.
Yes, I think cardinality should be refactored too. I've created #6909.
There was a problem hiding this comment.
Refactored as suggested, fairly large shuffling around of stuff, but I think is maybe cleaner 👍
| { | ||
| if (selector.getRow().size() > 1) { | ||
| selector.getRow().forEach(v -> { | ||
| String value = selector.lookupName(v); |
There was a problem hiding this comment.
I guess when DimensionSelector.nameLookupPossibleInAdvance() is true for this dimension selector, it might be much faster to hash indexes rather than strings. See other usages of nameLookupPossibleInAdvance() in the codebase.
There was a problem hiding this comment.
Javadocs aren't super clear and it isn't obvious to me yet looking at code, does nameLookupPossibleInAdvance indicate that indexes uniquely map to the same value across all segments? That would need to be true for this to work, and it would also require modifications to the BloomDimFilter implementation to be able to work with this as well.
I think the main use case of this aggregator right now is to be able to produce bloom filters from data in druid, which can be used as input to additional queries to filter other data in druid using BloomDimFilter which this extension originally introduced, so the goal is to have them operate in the same manner.
It definitely is expensive to do the hashing, it feels even apparent on the BloomDimFilter side of things, but I think this optimization is maybe out of scope of this PR
There was a problem hiding this comment.
Javadocs aren't super clear and it isn't obvious to me yet looking at code, does nameLookupPossibleInAdvance indicate that indexes uniquely map to the same value across all segments? That would need to be true for this to work, and it would also require modifications to the BloomDimFilter implementation to be able to work with this as well.
Yes, map uniquely. Feel free to clarify that Javadoc right in this PR.
I think the main use case of this aggregator right now is to be able to produce bloom filters from data in druid, which can be used as input to additional queries to filter other data in druid using BloomDimFilter which this extension originally introduced, so the goal is to have them operate in the same manner.
It definitely is expensive to do the hashing, it feels even apparent on the BloomDimFilter side of things, but I think this optimization is maybe out of scope of this PR
OK, then please leave a comment somewhere with an explanation why the optimization is not applied.
| * ByteBuffer, e.g. all add and merge methods. Test methods were not added because we don't need them.. but would | ||
| * probably be chill to do so it is symmetrical. | ||
| * | ||
| * Todo: remove this and begin using hive-storage-api version again once |
There was a problem hiding this comment.
But above in this PR it is mentioned that currently Bloom Filter aggregator is compute-time only, how is the Hive integration relevant then?
…yFactory to instead use specialized aggregators for each supported column type, other review comments
| { | ||
| final ColumnValueSelector<BloomKFilter> selector = metricFactory.makeColumnValueSelector(fieldName); | ||
| if (selector instanceof NilColumnValueSelector) { | ||
| throw new ISE("WTF?! Unexpected NilColumnValueSelector"); |
|
|
||
| return new BloomFilterAggregator(selectorPlus, maxNumEntries); | ||
| if (selector instanceof NilColumnValueSelector) { | ||
| return new NilBloomFilterAggregator((NilColumnValueSelector) selector, filter); |
There was a problem hiding this comment.
Is it important that BloomKFilter has specifically maxNumEntries? If not, new BloomKFilter(0) (or 1) could be cached in a constant.
There was a problem hiding this comment.
In any case, (NilColumnValueSelector) selector shouldn't be passed down, NilColumnValueSelector could call super(NilColumnValueSelector.instance()) in its constructor.
There was a problem hiding this comment.
BloomKFilter must be the same size to merge, so it's not possible to make a constant. 👍 on the signature change though, will do that.
There was a problem hiding this comment.
Ok, please add a comment noting this.
| import java.util.List; | ||
| import java.util.Objects; | ||
|
|
||
| public class BloomFilterAggregatorFactory extends AggregatorFactory |
There was a problem hiding this comment.
As far as I could tell while implementing this, makeAggregateCombiner is only called to merge segments at ingestion time; the issue that those pulls reference, #6877, maybe jives with that since it is an exception during index merging.
There was a problem hiding this comment.
The long term idea is to replace the remaining uses of combine() with AggregateCombiner and remove combine() method, to reduce repetition. When this time comes, it will be harder to implement that for every aggregator. So IMO it's better to implement makeAggregateCombiner() in all aggregators in core Druid eagerly.
There was a problem hiding this comment.
Added BloomFilterAggregateCombiner
| if (capabilities == null) { | ||
| BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); | ||
| if (selector instanceof NilColumnValueSelector) { | ||
| return new NilBloomFilterAggregator((NilColumnValueSelector) selector, filter); |
There was a problem hiding this comment.
In some other complex aggregators, this thing is called "NoOp" aggregator. But IMO it would be more correct to call it "Empty". "Nil" sounds like it should return null from Aggregator.get(), but it returns an empty bloom filter.
There was a problem hiding this comment.
👍 agree, will change to "Empty"
There was a problem hiding this comment.
Could you please rename "NoOp" aggregators (I see it in ArrayOfDoublesSketch), and whatever other nonstandard names other complex aggregators use (I only see that DistinctCount already uses "Empty") and align everything to the same convention?
There was a problem hiding this comment.
Created #6934 and assigned to myself to do as a follow-up PR
|
Still LGTM after latest changes |
|
Thanks for review everyone 🤘 |
…he#6397) * blooming aggs * partially address review * fix docs * minor test refactor after rebase * use copied bloomkfilter * add ByteBuffer methods to BloomKFilter to allow agg to use in place, simplify some things, more tests * add methods to BloomKFilter to get number of set bits, use in comparator, fixes * more docs * fix * fix style * simplify bloomfilter bytebuffer merge, change methods to allow passing buffer offsets * oof, more fixes * more sane docs example * fix it * do the right thing in the right place * formatting * fix * avoid conflict * typo fixes, faster comparator, docs for comparator behavior * unused imports * use buffer comparator instead of deserializing * striped readwrite lock for buffer agg, null handling comparator, other review changes * style fixes * style * remove sync for now * oops * consistency * inspect runtime shape of selector instead of selector plus, static comparator, add inner exception on serde exception * CardinalityBufferAggregator inspect selectors instead of selectorPluses * fix style * refactor away from using ColumnSelectorPlus and ColumnSelectorStrategyFactory to instead use specialized aggregators for each supported column type, other review comments * adjustment * fix teamcity error? * rename nil aggs to empty, change empty agg constructor signature, add comments * use stringutils base64 stuff to be chill with master * add aggregate combiner, comment
This PR, building on top of the work introduced in #6222, extends
druid-bloom-filterswith a query time aggregator, allowing bloom filters to be computed from query results which can then be used as input tobloomfilters in subsequent queries.example query:
{ "queryType": "timeseries", "dataSource": "wikiticker", "intervals": [ "2015-09-12T00:00:00.000/2015-09-13T00:00:00.000" ], "granularity": "day", "aggregations": [ { "type": "bloom", "name": "userBloom", "maxNumEntries": 100000, "field": { "type":"default", "dimension":"user", "outputType": "STRING" } } ] }example results:
[{"timestamp":"2015-09-12T00:00:00.000Z","result":{"userBloom":"BAAAJhAAAA..."}}]