-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][java-client] Improve performance of multi-topic consumer with more than one IO thread #16336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][java-client] Improve performance of multi-topic consumer with more than one IO thread #16336
Conversation
| .maxNumBytes(-1) | ||
| .timeout(0, TimeUnit.MILLISECONDS) | ||
| .build(); | ||
| configurationData.setBatchReceivePolicy(internalBatchReceivePolicy); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other 2 places creating ConsumerImpl in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jason918 Fixed.
| private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) { | ||
| consumer.receiveAsync().thenAcceptAsync(message -> { | ||
| CompletableFuture<List<Message<T>>> messagesFuture; | ||
| if (consumer.numMessagesInQueue() >= 10) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make sure It's worth using batch receive API. If less than 10 messages are in the internal consumer and the batch receive operation will also create an ArrayList. And I think we'd better not introduce a new configuration to make the client configuration more complex.
| protected abstract void updateAutoScaleReceiverQueueHint(); | ||
|
|
||
| protected boolean hasEnoughMessagesForBatchReceive() { | ||
| if (batchReceivePolicy.getTimeoutMs() <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like a break change if user set this to 0, then batchReceive() would return Messages with empty message immediately when there is no message to consume.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Jason918 fixed, use 1ms instead.
| private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) { | ||
| consumer.receiveAsync().thenAcceptAsync(message -> { | ||
| CompletableFuture<List<Message<T>>> messagesFuture; | ||
| if (consumer.numMessagesInQueue() >= 10) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is not configurable, should we test out a good threshold, 10 is the best?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a new batchReceive implementation to directly get as many messages in incomingMessages? This way you don't need to judge the size of the numMessagesInQueue.
Similar to just running this logic.
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
if (!isValidConsumerEpoch(msg)) {
msgPeeked = incomingMessages.peek();
continue;
}
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
msgPeeked = incomingMessages.peek();
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing your latest changes, oh, just call batch directly.
a950955 to
258593b
Compare
| // Call receiveAsync() if the incoming queue is not full. Because this block is run with | ||
| // thenAcceptAsync, there is no chance for recursion that would lead to stack overflow. | ||
| receiveMessageFromConsumer(consumer); | ||
| receiveMessageFromConsumer(consumer, messages.size() > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If entered receiveMessageFromConsumer from this line and the messages.size() = 0 in this cycle, is there any chance to call batch receive in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if messages.size() = 0, the current round of receive messages from the internal consumer will use consumer.receiveAsync(). After the internal consumer has new incoming messages, the next round will use the batchReceiveAsync() again.
… consumer with more than one IO thread ### Motivation After run the test with partitioned topic and 4 IO thread. ``` bin/pulsar-perf produce test -r 500000 -s 1 -mk random -o 10000 -threads 2 bin/pulsar-perf consume test -q 100000 -ioThreads 4 ``` The consumer got a very bad performance: ``` Profiling started 2022-07-01T23:03:13,230+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 216223 msg --- 21517.245 msg/s --- 0.164 Mbit/s --- Latency: mean: 2702.319 ms - med: 2644 - 95pct: 4594 - 99pct: 4758 - 99.9pct: 4844 - 99.99pct: 4854 - Max: 4854 2022-07-01T23:03:23,494+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 812896 msg --- 58004.006 msg/s --- 0.443 Mbit/s --- Latency: mean: 9826.540 ms - med: 9992 - 95pct: 12905 - 99pct: 13210 - 99.9pct: 13284 - 99.99pct: 13288 - Max: 13288 2022-07-01T23:03:33,506+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 1548826 msg --- 73501.942 msg/s --- 0.561 Mbit/s --- Latency: mean: 18000.596 ms - med: 18038 - 95pct: 22170 - 99pct: 22501 - 99.9pct: 22585 - 99.99pct: 22593 - Max: 22594 2022-07-01T23:03:43,519+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 2271634 msg --- 72190.579 msg/s --- 0.551 Mbit/s --- Latency: mean: 26907.162 ms - med: 26914 - 95pct: 30770 - 99pct: 31196 - 99.9pct: 31290 - 99.99pct: 31297 - Max: 31298 2022-07-01T23:03:53,527+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 2852575 msg --- 58021.315 msg/s --- 0.443 Mbit/s --- Latency: mean: 35547.844 ms - med: 35503 - 95pct: 39662 - 99pct: 40044 - 99.9pct: 40120 - 99.99pct: 40129 - Max: 40130 2022-07-01T23:04:03,539+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 3317622 msg --- 46424.278 msg/s --- 0.354 Mbit/s --- Latency: mean: 44615.969 ms - med: 44597 - 95pct: 48807 - 99pct: 49149 - 99.9pct: 49218 - 99.99pct: 49224 - Max: 49225 2022-07-01T23:04:13,554+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 3715533 msg --- 39746.763 msg/s --- 0.303 Mbit/s --- Latency: mean: 53487.733 ms - med: 53445 - 95pct: 57427 - 99pct: 58195 - 99.9pct: 58413 - 99.99pct: 58425 - Max: 58443 ``` Use batch receive in the MultiTopicsConsumerImpl internal to fix the performance issue. After this PR: ``` Profiling started 2022-07-02T00:15:33,291+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 91938234 msg --- 500221.875 msg/s --- 3.816 Mbit/s --- Latency: mean: 17.755 ms - med: 18 - 95pct: 23 - 99pct: 28 - 99.9pct: 34 - 99.99pct: 36 - Max: 36 2022-07-02T00:15:43,308+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 96929487 msg --- 498127.011 msg/s --- 3.800 Mbit/s --- Latency: mean: 27.666 ms - med: 18 - 95pct: 80 - 99pct: 98 - 99.9pct: 104 - 99.99pct: 105 - Max: 106 2022-07-02T00:15:53,328+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 101955467 msg --- 501660.867 msg/s --- 3.827 Mbit/s --- Latency: mean: 19.226 ms - med: 18 - 95pct: 32 - 99pct: 59 - 99.9pct: 66 - 99.99pct: 67 - Max: 68 2022-07-02T00:16:03,356+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 106959191 msg --- 499143.020 msg/s --- 3.808 Mbit/s --- Latency: mean: 132.371 ms - med: 28 - 95pct: 474 - 99pct: 505 - 99.9pct: 511 - 99.99pct: 515 - Max: 515 2022-07-02T00:16:13,378+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 111982006 msg --- 501017.414 msg/s --- 3.822 Mbit/s --- Latency: mean: 21.249 ms - med: 18 - 95pct: 53 - 99pct: 74 - 99.9pct: 81 - 99.99pct: 83 - Max: 84 ```
258593b to
07280ff
Compare
…th more than one IO thread (apache#16336) (cherry picked from commit bdda1eb) (cherry picked from commit 1649ef4)
…th more than one IO thread (apache#16336)
Motivation
After running the test with the partitioned topic(partitioned topic with only 1 partition) and 4 IO threads.
The consumer got a very bad performance:
consumer01.html.txt
Use batch receive in the MultiTopicsConsumerImpl internal to fix the performance issue.
After this PR:
consumer2.html.txt
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)