Skip to content

KAFKA-4850: Enable bloomfilters#6012

Merged
guozhangwang merged 7 commits intoapache:trunkfrom
bbejeck:MINOR_enable_bloom_filters
Jan 24, 2019
Merged

KAFKA-4850: Enable bloomfilters#6012
guozhangwang merged 7 commits intoapache:trunkfrom
bbejeck:MINOR_enable_bloom_filters

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Dec 7, 2018

This PR enables BloomFilters for RocksDB to speed up point lookups.
The request for this has been around for some time - https://issues.apache.org/jira/browse/KAFKA-4850

For testing, I've done the following

  1. Ran the standard streams suite of unit and integration tests
  2. Kicked off the simple benchmark test with bloom filters enabled
  3. Kicked off the simple benchmark test with bloom filters not enabled
  4. Kicked off streams system tests

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Dec 7, 2018

ping @guozhangwang, @mjsax, and @vvcephei for reviews

@SuppressWarnings("unchecked")
public void openDB(final ProcessorContext context) {
// initialize the default rocksdb options
protected TableFormatConfig getTableConfig() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor table config to method

}

@Override
protected TableFormatConfig getTableConfig() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For windowed stores, don't enable bloom filters

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It is about range queries? Re-call that we convert range-queries into multiple point-lookups.

Furthermore, range queries could happen via IQ on key-value-stores, too.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It is about range queries? Re-call that we convert range-queries into multiple point-lookups.

I took a cursory look at the code, but you raise a good point. Overall I'm thinking maybe we need to see if Bloom-filters affect range queries and if not, maybe just enable them across the board.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Dec 7, 2018

kicked off both streams simple benchmark tests

@mjsax mjsax added the streams label Dec 9, 2018
@vvcephei
Copy link
Copy Markdown
Contributor

Thanks, @bbejeck !

Do you know if Rocks will automatically upgrade existing stores to add the bloom filter, or, if not, will it gracefully handle their absence?

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Dec 10, 2018

Do you know if Rocks will automatically upgrade existing stores to add the bloom filter, or, if not, will it gracefully handle their absence?

@vvcephei - I think so, but I'll add a test to an existing unit/integration test to confirm

final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
tableConfig.setBlockSize(BLOCK_SIZE);
tableConfig.setFilter(new BloomFilter());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bbejeck,

Should we also consider options.optimizeFiltersForHits() to save memory on the bloom filter in exchange for one I/O for each get on a missing key?

(see https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks)

I just ran across this while reading about caching in Rocks.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei yeah that makes sense.

\cc @guozhangwang @mjsax WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, here's what I'm thinking...
Bloom filters can only tell you if the key is definitely not in the set. They can only answer "no" or "maybe". So if the filter says the key isn't in some sst we don't have to actually do the I/O to check, but if it says "maybe", we do still have to check.

This optimization would add a bloom filter to every level in the SST hierarchy except the last level.

The rationale is that if you're pretty sure the keys you get are in the db, the bloom filters at the higher levels would let you skip querying the SSTs that don't contain your key. If you get all the way to the bottom level, we're pretty sure the key is there (via our prior assumption), so checking the bloom filter isn't that valuable, since it would rarely answer "no".
If it answers "maybe", we have to check anyway. In other words, the filter only saves I/O in the rare case that it does say "no".
On the other hand, the last level has the most keys in it, so those are the most expensive filters. By dropping that last level of filters, we save a bunch of memory in exchange for rare extra I/Os.

Do we have a prior assumption that the keys we query for are rarely missing? I think so...
In general, Streams only does a get while computing an aggregation value, etc. In this case, it does a get followed by a put. Therefore, the only get that might return missing is the very first one for each key.

Factors that would cause more missed gets would be stuff like:

  • IQ
  • data sets that have a lot of thrash in the key space

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Jan 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the more in-depth explanation @vvcephei sounds good to me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vvcephei for the explanation. I agree with you that for most cases we expect multiple gets on each key (only the first get will miss), besides the ones you already listed that may not be the case another case is that windowed stream-stream join will always have distinct keys, but given that this may be fixed in the future I'm in favor of adding it as well.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 14, 2019

@bbejeck Should this be targeted to https://issues.apache.org/jira/browse/KAFKA-4850 instead of being a "MINOR" change?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 14, 2019

Seems there is another similar PR: #3048 -- should we close the other PR after this one gets merged?

\cc @guozhangwang

@bbejeck bbejeck changed the title MINOR: Enable bloomfilters KAFKA-4850: Enable bloomfilters Jan 15, 2019
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jan 15, 2019

@bbejeck Should this be targeted to https://issues.apache.org/jira/browse/KAFKA-4850 instead of being a "MINOR" change?

@mjsax - done

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bbejeck , I've got two meta comments:

  1. Could you upload the benchmark results (the original url has expired) as images to the PR here just for the record of the motivation for other readers?

  2. As discussed in the PR, I think it is better to enable options.optimizeFiltersForHits() suggested by @vvcephei

private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L;
private static final long BLOCK_CACHE_SIZE = 50 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
protected static final long BLOCK_CACHE_SIZE = 50 * 1024 * 1024L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these two need to be protected now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oversight from previous change, I'll revert

final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
tableConfig.setBlockSize(BLOCK_SIZE);
tableConfig.setFilter(new BloomFilter());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vvcephei for the explanation. I agree with you that for most cases we expect multiple gets on each key (only the first get will miss), besides the ones you already listed that may not be the case another case is that windowed stream-stream join will always have distinct keys, but given that this may be fixed in the future I'm in favor of adding it as well.

@bbejeck bbejeck force-pushed the MINOR_enable_bloom_filters branch from 1db37c0 to 75a9ea6 Compare January 16, 2019 15:29
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jan 16, 2019

rebased with trunk and enabled optimizeFiltersForHits will add unit test next, pushed updated branch now to kick off smoke tests for perf numbers

@guozhangwang
Copy link
Copy Markdown
Contributor

Thanks @bbejeck ! Please ping the reviewers whenever you think it's ready for another look again.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jan 22, 2019

updated this with unit test showing saving records with bloom filters off, closing RocksDB then open again with bloom filters enabled, no errors and can retrieve previous records successfully.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jan 22, 2019

Here's performance numbers based on 5 run of StreamsSimpleBenchmark

Bloom Filter Enabled Bloom Filter Disabled
streamcount process-rate: 76084 streamcount process-rate: 71721
streamcountwindowed process-rate: 29442 streamcountwindowed process-rate: 29025
streamprocess process-rate: 89753 streamprocess process-rate: 90035
streamprocesswithsink process-rate: 64727 streamprocesswithsink process-rate: 69181
streamprocesswithstatestore process-rate: 54120 streamprocesswithstateststore process-rate: 51843
streamprocesswithwindowstore process-rate: 276 streamprocesswithwindowstore process-rate: 296
streamstreamjoin process-rate: 9022 streamstreamjoin process-rate: 8564
streamtablejoin process-rate: 62758 streamtablejoin process-rate: 61632
tabletablejoin process-rate: 23131 tabletablejoin process-rate: 20248
Bloom Filter Enabled Bloom Filter Disabled
streamcount process-rate: 77640 streamcount process-rate: 72629
streamcountwindowed process-rate: 31144 streamcountwindowed process-rate: 29514
streamprocess process-rate: 89927 streamprocess process-rate: 90174
streamprocesswithsink process-rate: 67203 streamprocesswithsink process-rate: 71722
streamprocesswithstateststore process-rate: 55822 streamprocesswithstateststore process-rate: 53375
streamprocesswithwindowstore process-rate: 306 streamprocesswithwindowstore process-rate: 298
streamstreamjoin process-rate: 8829 streamstreamjoin process-rate: 8565
streamtablejoin process-rate: 63545 streamtablejoin process-rate: 61512
tabletablejoin process-rate: 22557 tabletablejoin process-rate: 20816
Bloom Filter Enabled Bloom Filter Disabled
streamcount process-rate: 79359 streamcount process-rate: 68352
streamcountwindowed process-rate: 30884 streamcountwindowed process-rate: 27044
streamprocess process-rate: 89071 streamprocess process-rate: 87730
streamprocesswithsink process-rate: 67792 streamprocesswithsink process-rate: 68265
streamprocesswithstateststore process-rate: 54194 streamprocesswithstateststore process-rate: 49070
streamprocesswithwindowstore process-rate: 295 streamprocesswithwindowstore process-rate: 245
streamstreamjoin process-rate: 8906 streamstreamjoin process-rate: 8294
streamtablejoin process-rate: 63767 streamtablejoin process-rate: 54966
tabletablejoin process-rate: 22190 tabletablejoin process-rate: 18780
Bloom Filter Enabled Bloom Filter Disabled
streamcount process-rate: 78089 streamcount process-rate: 71954
streamcountwindowed process-rate: 30700 streamcountwindowed process-rate: 28342
streamprocess process-rate: 89418 streamprocess process-rate: 88664
streamprocesswithsink process-rate: 66792 streamprocesswithsink process-rate: 68618
streamprocesswithstateststore process-rate: 54254 streamprocesswithstateststore process-rate: 51045
streamprocesswithwindowstore process-rate: 304 streamprocesswithwindowstore process-rate: 254
streamstreamjoin process-rate: 9020 streamstreamjoin process-rate: 8298
streamtablejoin process-rate: 61304 streamtablejoin process-rate: 61538
tabletablejoin process-rate: 20046 tabletablejoin process-rate: 22182
Bloom Filter Enabled Bloom Filter Disabled
streamcount process-rate: 72707 streamcount process-rate: 75692
streamcountwindowed process-rate: 26947 streamcountwindowed process-rate: 30048
streamprocess process-rate: 89815 streamprocess process-rate: 88445
streamprocesswithsink process-rate: 65672 streamprocesswithsink process-rate: 71350
streamprocesswithstateststore process-rate: 51210 streamprocesswithstateststore process-rate: 53989
streamprocesswithwindowstore process-rate: 294 streamprocesswithwindowstore process-rate: 263
streamstreamjoin process-rate: 9317 streamstreamjoin process-rate: 8978
streamtablejoin process-rate: 62071 streamtablejoin process-rate: 60340
tabletablejoin process-rate: 22024 tabletablejoin process-rate: 21077

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jan 22, 2019

ping @ableegoldman, @guozhangwang, @mjsax, and @vvcephei for another review

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bbejeck ,

Thanks for adding the test and the optimization. It looks good overall.

I'm just curious of the motivation for changing the vagrant config...

Thanks,
-John

Comment thread vagrant/base.sh Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why did you need to update the jvm?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

branch builder was failing without this update it's from the tools team, but I meant to pull this commit out, I'll rebase.

@bbejeck bbejeck force-pushed the MINOR_enable_bloom_filters branch from 46bd465 to 846ea8d Compare January 22, 2019 15:43
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! Thanks, @bbejeck

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @bbejeck .

One interesting observation though is that for process-with-sink, which does not use state stores at all, the new code is consistently worse than current trunk... we need to look into a better way for consistent perf regression testing :)

@guozhangwang guozhangwang merged commit 0efed12 into apache:trunk Jan 24, 2019
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* ak/trunk:
  MINOR: fix race condition in KafkaStreamsTest (apache#6185)
  KAFKA-4850: Enable bloomfilters (apache#6012)
  MINOR: ducker-ak: add down -f, avoid using a terminal in ducker test
  KAFKA-5117: Stop resolving externalized configs in Connect REST API
  MINOR: Cleanup handling of mixed transactional/idempotent records (apache#6172)
  KAFKA-7844: Use regular subproject for generator to fix *All targets (apache#6182)
  Fix Documentation for cleanup.policy is out of date (apache#6181)
  MINOR: increase timeouts for KafkaStreamsTest (apache#6178)
  MINOR: Rejoin split ssl principal mapping rules (apache#6099)
  MINOR: Handle case where connector status endpoints returns 404 (apache#6176)
  MINOR: Remove unused imports, exceptions, and values (apache#6117)
  KAFKA-3522: Add internal RecordConverter interface (apache#6150)
  Fix Javadoc of KafkaConsumer (apache#6155)
  KAFKA-6455: Extend CacheFlushListener to forward timestamp (apache#6147)
  MINOR: Log partition info when creating new request batch in controller (apache#6145)
  KAFKA-7652: Part I; Fix SessionStore's findSession(single-key) (apache#6134)
  MINOR: Remove the InvalidTopicException handling in InternalTopicManager (apache#6167)
  [KAFKA-7024] Rocksdb state directory should be created before opening the DB (apache#6138)
  MINOR:: Fix typos (apache#6079)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
This PR enables BloomFilters for RocksDB to speed up point lookups.
The request for this has been around for some time - https://issues.apache.org/jira/browse/KAFKA-4850

For testing, I've done the following

Ran the standard streams suite of unit and integration tests
Kicked off the simple benchmark test with bloom filters enabled
Kicked off the simple benchmark test with bloom filters not enabled
Kicked off streams system tests

Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>
@bbejeck bbejeck deleted the MINOR_enable_bloom_filters branch July 10, 2024 13:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants