-
Notifications
You must be signed in to change notification settings - Fork 49
Description
Describe the bug
The max_total_receiver_queue_size_across_partitions kwarg to the Python subscribe method is nonfunctional.
To Reproduce
- Create a persistent partitioned topic. I used 4 partitions.
- Create a
Sharedsubscription on the topic. - Publish 10 messages to the topic using
batching_type=BatchingType.KeyBasedand a unique partition key for each message (this is not necessary with a Shared subscription, but is necessary to demonstrate that this bug also affects KeyShared subscriptions). - Create a consumer on the topic with the below code, and ensure it prints
Got message, sleeping forever. - In a second terminal, start another consumer on the topic with the below code.
- Observe that the second consumer does not get a message.
- Publish additional messages to the topic.
- Observe that only after the second publish step does the consumer get messages.
Consumer code:
import time
from pulsar import Client, ConsumerType, Timeout
import os
TOPIC = 'THETOPIC'
SUBSCRIPTION = 'THESUBSCRIPTION'
def main():
client = Client(service_url='pulsar://localhost:6650')
sub = client.subscribe(
topic=TOPIC,
subscription_name=SUBSCRIPTION,
consumer_type=ConsumerType.Shared,
max_total_receiver_queue_size_across_partitions=1,
consumer_name=f'testconsumer-{os.getpid()}'
)
while True:
try:
msg = sub.receive(100)
mid = msg.message_id()
print("partition:", mid.partition(), "ledger:", mid.ledger_id(), "entry:", mid.entry_id(), "batch:", mid.batch_index())
break
except Timeout:
pass
print("Got message, sleeping forever")
while True:
time.sleep(1)
if __name__ == '__main__':
main()Expected behavior
The second consumer should receive messages from the topic immediately upon startup. The first consumer should only prevent the second consumer from getting max_total_receiver_queue_size_across_partitions messages.
I'm not sure what setting max_total_receiver_queue_size_across_partitions to 0 should do; that's not documented, and probably should be; these docs indicate that it should behave equivalent to a value of 1 with regards to other consumers' ability to get messages.
I'm not sure what the interaction is between receiver_queue_size and max_total_receiver_queue_size_across_partitions; that should be documented as well, but as part of apache/pulsar#15702.
Additional context
After around ~320 messages in the backlog (given my message size), the second consumer will get data when it starts. I don't know why that cutoff exists.
Environment:
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ arch
i386
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ sw_vers
ProductName: macOS
ProductVersion: 12.3.1
BuildVersion: 21E258
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ brew info apache-pulsar
apache-pulsar: stable 2.10.0 (bottled), HEAD
Cloud-native distributed messaging and streaming platform
https://pulsar.apache.org/
/usr/local/Cellar/apache-pulsar/2.10.0 (1,018 files, 949.7MB) *
Poured from bottle on 2022-05-13 at 12:10:54
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-pulsar.rb
License: Apache-2.0
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ python --version
Python 3.7.13
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ pip show pulsar-client
Name: pulsar-client
Version: 2.10.0
Summary: Apache Pulsar Python client library
Home-page: https://pulsar.apache.org/
Author: Pulsar Devs
Author-email: dev@pulsar.apache.org
License: Apache License v2.0
Location: /Users/zac.bentley/Desktop/Projects/Klaviyo/chariot/.venv/lib/python3.7/site-packages
Requires: certifi, six