In my program, there will be eight producers, two of which will be sent to partitioned topics and six of which will be sent to unpartitioned topics.
Below is my topic configuration:
producer = client.create_producer(
producer_name='raw-test',
batching_enabled=True,
block_if_queue_full=True,
batching_type=BatchingType.KeyBased,
batching_max_messages=2000,
batching_max_allowed_size_in_bytes=1000 * 1024,
batching_max_publish_delay_ms=20,
compression_type=pulsar.CompressionType.ZSTD,
send_timeout_millis=0,
topic='persistent://ethereum-test/raw/tests'
)
I am using sendAsync() to send messages, but I am getting many "Connection closed" messages after receiving the warning "Received send error from the server: Cannot determine whether the message is a duplicate at this time".
I also found that I can remove BatchingType.KeyBased or turn off batching-mode without encountering this problem, and that the problem does not consistently occur.
Here are my debug logs:
pulsar_log.txt
pulsar-client version is 2.10.1
pulsar cluster version is 2.10.2