-
Notifications
You must be signed in to change notification settings - Fork 142
Fix pulsar-io-thread block forever #946
Fix pulsar-io-thread block forever #946
Conversation
|
Is it related to #942? |
yes |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
Outdated
Show resolved
Hide resolved
|
In addition, is there a way to reproduce it? I think we can configure |
Both maxQueuedRequests and numIOThreads are set to 1, which should be easier to reproduce |
|
@BewareMyPower I use the common order excuter to send response. PLTAL , thanks! |
|
Nice catch! |
|
LGTM. I will approve it after I have a simple test for following configs. Please also take a look, @hangc0276 @Demogorgon314 |
aloyszhang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
I think this PR still doesn't fix the problem. Here are my logs of KoP standalone: I test it simply with following extra configs: maxQueuedRequests=1
sendKafkaResponseThreads=1And send messages via Kafka CMD tool: $ ./bin/kafka-producer-perf-test.sh --topic my-topic --num-records 10000 --throughput 1000 --record-size 1024 --producer.config temp.properties
[2021-12-02 18:35:32,240] WARN [Producer clientId=producer-1] Got error produce response with correlation id 5 on topic-partition my-topic-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2021-12-02 18:35:32,241] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition my-topic-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)
... |
|
Before this PR is repaired, once the pulsar-io thread is blocked in the putting queue, the thread will be blocked forever. This PR is to solve the problem. |
|
How big is the bookkeeper thread? You can add it to the bookkeeper thread to increase the processing capacity. In addition, is the managedledgerdefaultensemblesize of bookkeeper set to 1 |
|
Sorry I've made a mistake. I configured I think the root cause is that the value of |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
Outdated
Show resolved
Hide resolved
Demogorgon314
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. The default value of numSendKafkaResponseThreads might need some benchmark test to find out.
So we can set the default value to 4? @Demogorgon314 @hangc0276 |
|
Did you compare it with other values? I think you can give a simple comparison with |
When using runtime. Getruntime(). Availableprocessors(), the number of send response threads on my test broker is 24 |
|
It makes sense. |
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
` ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
});`
This will cause a lot of close_wait on the server side;
related to:
#942
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
` ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
});`
This will cause a lot of close_wait on the server side;
related to:
#942
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
` ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
});`
This will cause a lot of close_wait on the server side;
related to:
streamnative#942
(cherry picked from commit c4ce316)
This reverts commit c4ce316.


When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
ctx.channel().eventLoop().execute(() -> { writeAndFlushResponseToClient(channel); });This will cause a lot of close_wait on the server side;
related to:
#942