[enh][transaction] Optimize to reuse transaction buffer snapshot writer#15015
[enh][transaction] Optimize to reuse transaction buffer snapshot writer#15015gaoran10 wants to merge 8 commits intoapache:masterfrom
Conversation
|
|
||
| private synchronized void initWriterFuture() { | ||
| this.future = service.getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(); | ||
| this.future.thenRunAsync(this.backoff::reset).exceptionally(throwable -> { |
There was a problem hiding this comment.
Why use thenRunAsync with common-pool here? I'm not sure we should use a specific executor?
There was a problem hiding this comment.
Does the common pool have some potential problems?
There was a problem hiding this comment.
It's OK here.
Just to prevent others from using common-pool to do some IO-intensive tasks and affecting there.
After deep thinking, I think we need to check all abuses of common-pool, not against it.
| // Resolve potential race condition problem, if retain method encounter reference count exception | ||
| // or other exceptions, create a new `ReferenceCountedWriter`, when the `ReferenceCountedWriter` release | ||
| // but didn't remove from `writerFutureMap`. | ||
| return new ReferenceCountedWriter(namespaceName, this); |
There was a problem hiding this comment.
Question: Should we close the original writer?
There was a problem hiding this comment.
The previous writer will be closed when its reference count reduces to 0, if its reference count reduces to 0, it must call the method deallocate.
There was a problem hiding this comment.
Ok, thanks for your explanation.
|
|
||
| // The class ReferenceCountedWriter will maintain the reference count, | ||
| // when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed. | ||
| public static class ReferenceCountedWriter extends AbstractReferenceCounted { |
There was a problem hiding this comment.
I suggest don't extends AbstractReferenceCounted, deallocate seem not be used. and is complicated to implement
There was a problem hiding this comment.
When the reference count reduces to 0, the deallocate will be called, maybe using it to compute the reference count is simple, it's thread-safe.
| private synchronized void initWriterFuture() { | ||
| this.future = service.getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(); | ||
| this.future.thenRunAsync(this.backoff::reset).exceptionally(throwable -> { | ||
| long delay = backoff.next(); |
There was a problem hiding this comment.
seem we only use backoff.next() once and then reset
There was a problem hiding this comment.
If encounter exceptions the backoff will increase, if the writer creates successfully, it will be reset.
The transaction buffer(short for TB) snapshot topic is used to persistent snapshot data for TB, the snapshot can reduce read data when recovering. Currently, if enable transaction feature, every topic will create a TB snapshot producer, the topic of the producer is a namespace system topic, so we don't need to create producer for every topic, one producer per namespace is enough. # Modification Add a new inner class `ReferenceCountedWriter`, it extends `AbstractReferenceCounted`, if the `ReferenceCountedWriter` is not exist, it will be initilied, or else its reference count will be increased, after the reference count reduce to 0, it will be removed from map cache.
a3c098b to
7cac709
Compare
congbobo184
left a comment
There was a problem hiding this comment.
Good work! left some comments
| } | ||
|
|
||
| private void closePendingCloseWriter() { | ||
| Iterator<Writer<TransactionBufferSnapshot>> iterator = pendingCloseWriterList.stream().iterator(); |
There was a problem hiding this comment.
Nit: why are you creating the stream?
We can iterate on the LinkedList directly, creating less garbage and also making the code simpler
|
The pr had no activity for 30 days, mark with Stale label. |
|
The pr had no activity for 30 days, mark with Stale label. |
|
This PR is too old, I create a new one #19641. |
Motivation
The transaction buffer(short for TB) snapshot topic is used to persistent snapshot data for TB, the snapshot can reduce read data when recovering.
Currently, if enable the transaction feature, every topic will create a TB snapshot producer, the topic of the producer is a namespace system topic, so we don't need to create a producer for every topic, one producer per namespace is enough.
Modification
Add a new inner class
ReferenceCountedWriter, if theReferenceCountedWriterfor one namespace does not exist, it will be initialized, or else its reference count will be increased, if the reference count reduces to 0, it will be removed from map-cache.Verifying this change
Adjust existing tests to cover this change.
Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
doc-required(If you need help on updating docs, create a doc issue)
no-need-doc(Please explain why)
doc(If this PR contains doc changes)