Skip to content

Conversation

@thetumbled
Copy link
Member

@thetumbled thetumbled commented Jan 12, 2023

Fixes #19200

Motivation

transaction lasted for long time and will not be aborted, which cause TB's MaxReadPosition do not move and will not take snapshot. With an old snapshot, TB will read a lot of entry while doing recovery.
In worst cases, there are 30 minutes of unavailable time with Topics.

Modifications

make CoordinatorNotFoundException retryable.
avoid concurrent execution.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: thetumbled#12

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 12, 2023
@thetumbled
Copy link
Member Author

Maybe there is a better way to fix it?
for example, move following code

stores.put(tcId, store);

from org.apache.pulsar.broker.TransactionMetadataStoreService#handleTcClientConnect to org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback#replayComplete

@congbobo184
Copy link
Contributor

if the TC does not exist in the broker, the op doesn't need to retry. #18924 may has fix this problem

@thetumbled
Copy link
Member Author

thetumbled commented Jan 13, 2023

if the TC does not exist in the broker, the op doesn't need to retry. #18924 may has fix this problem

I have include this PR in my test enviroment, and exceptions above have been throwed. That PR do not gurantee that TC have been put into stores map before executing handleCommittingAndAbortingTransaction because of concurrent executing.
TC may exist in the broker, but just not been put into stores map.

@congbobo184
Copy link
Contributor

@thetumbled

openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
stores.put(tcId, store);

the future thenAccept change the executing thread, so the problem has never been truth fixed.

so its better to change code like this can slove this problem

                        openTransactionMetadataStore(tcId).thenAccept((store) -> {
                            stores.put(tcId, store);
                            internalPinnedExecutor.execute(() -> {

@thetumbled
Copy link
Member Author

thetumbled commented Jan 16, 2023

@thetumbled

openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
stores.put(tcId, store);

the future thenAccept change the executing thread, so the problem has never been truth fixed.
so its better to change code like this can slove this problem

                        openTransactionMetadataStore(tcId).thenAccept((store) -> {
                            stores.put(tcId, store);
                            internalPinnedExecutor.execute(() -> {

good idea. i have changed the patch.

@codelipenghui
Copy link
Contributor

@thetumbled It looks like the change is not about the issue that you described in the PR details. When reading the motivation of the PR, I thought it was related to the transaction buffer snapshot, but after checking the changes, it looks like the fix is for the transaction coordinator.

@thetumbled
Copy link
Member Author

thetumbled commented Jan 16, 2023

@thetumbled It looks like the change is not about the issue that you described in the PR details. When reading the motivation of the PR, I thought it was related to the transaction buffer snapshot, but after checking the changes, it looks like the fix is for the transaction coordinator.

When I was troubleshooting why transaction recovery took a long time, I found that the root cause is some transactions cannot be terminated though exceeding timeout, which leads to subsequent phenomena.

@congbobo184 congbobo184 changed the title [fix] [broker] fix timeout transaction. [fix][txn] fix txn coordinator recover handle committing and aborting txn race condition. Jan 16, 2023
completableFuture.complete(null);
tcLoadSemaphore.release();
})).exceptionally(e -> {
completableFuture.complete(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@congbobo184 @thetumbled Sorry, I didn't get the key point of the problem. The completableFuture is completed by the same thread of executing stores.put(tcId, store); Why will we have race condition here? The client-side should send the end transaction command after completing the TC connect stage.

Copy link
Member Author

@thetumbled thetumbled Jan 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two thread pools org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore#internalPinnedExecutor and org.apache.pulsar.broker.TransactionMetadataStoreService#internalPinnedExecutor. They are different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but before we complete the completableFuture here, we have added the item to the map. Why the subsequent request is not able to get from the map? The subsequent request only happened after the completableFuture was done, right? And the map is a ConcurrentHashMap, so what is the race condition here? Could you please provide more details about the race condition? How does it happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The completableFuture return by openTransactionMetadataStore(tcId) is completed by following code in thread MLTransactionMetadataStore#internalPinnedExecutor.

                        **completableFuture.complete(MLTransactionMetadataStore.this);** 
                        recoverTracker.handleCommittingAndAbortingTransaction();

Once the completableFuture is completed, stores.put will be executed in another thread TransactionMetadataStoreService#internalPinnedExecutor.

                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
                            stores.put(tcId, store);

So, we may execute recoverTracker.handleCommittingAndAbortingTransaction() before execute stores.put(tcId, store).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially this is a circular dependency problem.

TransactionMetadataStoreService.handleTcClientConnect -> MLTransactionMetadataStore.init -> TransactionRecoverTracker.handleCommittingAndAbortingTransaction -> TransactionMetadataStoreService.handleTcClientConnect

It looks like we need to ensure some state is changed in TransactionMetadataStoreService during init MLTransactionMetadataStore.

IMO, we should refactor this part finally to move the recoverTracker.handleCommittingAndAbortingTransaction(); to the TransactionMetadataStoreService to decouple the mutual state dependence.

@congbobo184 @liangyepianzhou WDYT?

It's hard to understand why the map put operation should be executed out of the internalPinnedExecutor while reading the code. This may present challenges for future maintenance.

Copy link
Contributor

@congbobo184 congbobo184 Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

openTransactionMetadataStore(tcId) can return MutablePair<store, recoverTracker> or after openTransactionMetadataStore(tcId) then init return the recoverTracker. I prefer to use the second way, in this way the logical more clear, after store init the tracker need to do something to handle the legacy the committing and abort transactions.

Copy link
Member Author

@thetumbled thetumbled Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first solution is more concise, and i have implement it.
As for the second approach, do you means that move init method out of openTransactionMetadataStore method?

Copy link
Contributor

@congbobo184 congbobo184 Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in the second way can completely decoupled. @codelipenghui WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I haven't got the point of openTransactionMetadataStore(tcId) then init return the recoverTracker, @congbobo184 Can you share the link?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracker recoverTracker) {

change to public CompletableFuture<TransactionRecoverTracker> init()

openTransactionMetadataStore(tcId) only return the store,

then the store can add a interface public CompletableFuture<TransactionRecoverTracker> init(),

TransactionMetadataStoreService can invoke the store.init() return TransactionRecoverTracker

init completely, recoverTracker can do



in the TransactionMetadataStoreService

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me now.
I just left some minor comments.

@thetumbled thetumbled force-pushed the fixbug_TransactionTimeout branch from 449d962 to e9f2f30 Compare January 30, 2023 09:25
@codelipenghui
Copy link
Contributor

@congbobo184 @liangyepianzhou Please help review again.

@codecov-commenter
Copy link

codecov-commenter commented Jan 30, 2023

Codecov Report

Merging #19201 (c7c2455) into master (4b0dc9a) will increase coverage by 11.74%.
The diff coverage is 47.36%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #19201       +/-   ##
=============================================
+ Coverage     48.96%   60.71%   +11.74%     
- Complexity     7300    25533    +18233     
=============================================
  Files           424     1895     +1471     
  Lines         45473   137527    +92054     
  Branches       4672    15099    +10427     
=============================================
+ Hits          22268    83503    +61235     
- Misses        20698    46300    +25602     
- Partials       2507     7724     +5217     
Flag Coverage Δ
systests 24.80% <0.00%> (?)
unittests 58.80% <47.36%> (+9.83%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...va/org/apache/pulsar/broker/service/ServerCnx.java 52.23% <ø> (+9.39%) ⬆️
...n/coordinator/impl/MLTransactionMetadataStore.java 75.64% <ø> (ø)
...pulsar/broker/TransactionMetadataStoreService.java 58.69% <47.36%> (+6.73%) ⬆️
...ice/streamingdispatch/PendingReadEntryRequest.java 0.00% <0.00%> (-68.19%) ⬇️
...ervice/streamingdispatch/StreamingEntryReader.java 0.00% <0.00%> (-60.24%) ⬇️
...istentStreamingDispatcherSingleActiveConsumer.java 0.00% <0.00%> (-50.52%) ⬇️
...ersistentStreamingDispatcherMultipleConsumers.java 0.00% <0.00%> (-45.55%) ⬇️
...ker/loadbalance/impl/LeastLongTermMessageRate.java 73.33% <0.00%> (-20.00%) ⬇️
...lsar/broker/loadbalance/impl/ThresholdShedder.java 27.04% <0.00%> (-3.28%) ⬇️
...balance/impl/SimpleResourceAllocationPolicies.java 51.42% <0.00%> (-2.86%) ⬇️
... and 1591 more

@congbobo184 congbobo184 merged commit 96f4161 into apache:master Feb 1, 2023
Technoboy- pushed a commit that referenced this pull request Feb 8, 2023
… txn race condition. (#19201)

Fixes #19200

transaction lasted for long time and will not be aborted, which cause TB's MaxReadPosition do not move and will not take snapshot. With an old snapshot, TB will read a lot of entry while doing recovery.
In worst cases, there are 30 minutes of unavailable time with Topics.

avoid concurrent execution.
liangyepianzhou pushed a commit that referenced this pull request Feb 9, 2023
… txn race condition. (#19201)

Fixes #19200

transaction lasted for long time and will not be aborted, which cause TB's MaxReadPosition do not move and will not take snapshot. With an old snapshot, TB will read a lot of entry while doing recovery.
In worst cases, there are 30 minutes of unavailable time with Topics.

avoid concurrent execution.

(cherry picked from commit 96f4161)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Feb 28, 2023
… txn race condition. (apache#19201)

Fixes apache#19200

transaction lasted for long time and will not be aborted, which cause TB's MaxReadPosition do not move and will not take snapshot. With an old snapshot, TB will read a lot of entry while doing recovery.
In worst cases, there are 30 minutes of unavailable time with Topics.

avoid concurrent execution.

(cherry picked from commit 96f4161)
(cherry picked from commit 5dd13ec)
@coderzc
Copy link
Member

coderzc commented Mar 2, 2023

@thetumbled Can you help cherry-pick this PR to branch-2.9?

@thetumbled
Copy link
Member Author

@thetumbled Can you help cherry-pick this PR to branch-2.9?

ok.

coderzc pushed a commit that referenced this pull request Mar 3, 2023
@coderzc coderzc added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Mar 3, 2023
Annavar-satish pushed a commit to pandio-com/pulsar that referenced this pull request Mar 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [broker] Timeout transaction do not end.

7 participants