-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Shared subscription: improvement with offloaded ledgers #16417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shared subscription: improvement with offloaded ledgers #16417
Conversation
...in/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
Outdated
Show resolved
Hide resolved
| // never over to the last entry again. | ||
| if (!seeked) { | ||
| inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); | ||
| Long knownOffset = entryOffsets.get(nextExpectedId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can a function to package this logic and all the index.getIndexEntryForEntry(nextExpectedId).getDataOffset() call this function.
Other seek places also need to check the index cache first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, in my testing it looked that only this point needs the cache, btw it is better to use it everywhere
...c/test/java/org/apache/bookkeeper/mledger/offload/jcloud/BlobStoreBackedInputStreamTest.java
Outdated
Show resolved
Hide resolved
5997cd1 to
afea8e9
Compare
| // this Cache is accessed only by one thread | ||
| private final Cache<Long, Long> entryOffsets = CacheBuilder | ||
| .newBuilder() | ||
| .expireAfterAccess(10, TimeUnit.MINUTES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 minutes may be too short, 30 minutes to 1 hour? This cache can be shared with other catch-up readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed it to 30 minutes and made it configurable via system property
I don't think we have to document the system property, I made it configurable only in case of problems
hangc0276
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job!
|
ping @zymap @horizonzy please help take a look, thanks. |
dba2dba to
f0237a7
Compare
f0237a7 to
388912d
Compare
horizonzy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
@eolivelli Please rebase the master branch |
|
@Technoboy- @codelipenghui |


Motivation
When you use a Shared subscription the Dispatcher (PersistentDispatcherMultipleConsumers) does our of order reads from the ManagedLedger, this is happening very frequently in the
consumerFlowmethod, that basically re-reads messages that have not been processed yet.This happens because
consumerFlowcalls readMoreEntries() and this in turn finds that there are messages to be re-delivered, because not yet acknowledged by any consumer.This fact triggers backward seeks in the BlobStoreBackedReadHandleImpl. Even if the data is already loaded in memory you have to parse it again most of the time, because the seek currently is done to the beginning of a block, because the index keeps only track of some entries and not of every entry.
Modifications
Add a in memory cache of the offsets of each entry, this way when you have to do a backward seek we do the seek exactly to the position where you can find the entry.
The trade-off here is that we build this TreeMap that adds some memory costs
Verifying this change
This change added tests.
I did some manual testing with GCP and this patch brings a x2 improvement on throughput of a Shared subscription (with pulsar-perf) with a single consumer.
@dave2wave did more extensive testing with complex scenarios with OpenMessaging Benchmark and confirmed that this patch brings a big improvement on Shared subscriptions.
doc-not-needed