-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Client] Fix endless receiveAsync loop in MultiTopicsConsumer #12044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Client] Fix endless receiveAsync loop in MultiTopicsConsumer #12044
Conversation
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch !
+1
I am sure that you know better than me that those Thread.sleep() smell a little, but I am not sure how to do it without adding more and more instrumentation (probably with Powermock?)
| .subscribe(); | ||
|
|
||
| // wait for background tasks to start | ||
| Thread.sleep(1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot be sure that this task can be started within 1 second.
I think we should use awaitility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the implementation of internalReceiveAsync in MultiTopicConsumerImpl, I think it would be sufficient to modify this test to await the receipt of a single message from the multi topic consumer. After receiving a single message, the individual topic consumer will be added to the pendingReceives queue and will subsequently be failed with an AlreadyClosedException when the multi topic consumer is closed. That exception will then be handled in receiveMessageFromConsumer and will exercise the fix in this PR. The one downside to this implementation is that our test would be tightly coupled to the implementation. Let me know what you think, @lhotari.
michaeljmarshall
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I think we can improve the test, please see my comment.
Note: this bug leads to a leak of runnable tasks in some cases. For example, when we have a topic consumer in the pendingConsumers queue and we close the multi topic consumer, we'll fail the futures and get into this infinite retry loop. If the pulsar client is left open for a while, and multi topic consumers are opened and closed many times, the number of runnable tasks will continue to increase with each new consumer. Note that if the pulsar client itself is closed, the internal executor service is closed, which stops the leak.
| .subscribe(); | ||
|
|
||
| // wait for background tasks to start | ||
| Thread.sleep(1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the implementation of internalReceiveAsync in MultiTopicConsumerImpl, I think it would be sufficient to modify this test to await the receipt of a single message from the multi topic consumer. After receiving a single message, the individual topic consumer will be added to the pendingReceives queue and will subsequently be failed with an AlreadyClosedException when the multi topic consumer is closed. That exception will then be handled in receiveMessageFromConsumer and will exercise the fix in this PR. The one downside to this implementation is that our test would be tightly coupled to the implementation. Let me know what you think, @lhotari.
…#12044) Fixes apache#12024 (cherry picked from commit bb80c5b)
…erge request !100) Squash merge branch 'consumer-close' into '2.8.1' 修复 MQ proxy 消费者关闭之后,打印大量错误日志 的问题 TAPD: --bug=102239121
…erge request !100) (merge request !101) Squash merge branch 'consumer-loop' into 'release-2.8.1.4' 消费者关闭之后,打印大量错误日志 TAPD: --bug=102239121
Fixes #12024
Motivation
See #12024 for details. The PR #11843 changes causes a critical cleanup problem in the Pulsar Client 2.8.1. for multi-topic consumers.
Modifications
Ignore AlreadyClosedExceptions in the exceptionally handler which was added in PR #11843