-
Notifications
You must be signed in to change notification settings - Fork 3.7k
limiting batch size to the minimum of the maxNumberOfMessages and maxSizeOfMessages #6865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
/pulsarbot run-failure-checks |
9 similar comments
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
2 similar comments
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
would like to remove it from 2.5.2 |
|
/pulsarbot run-failure-checks |
2 similar comments
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
@avimas Could you please help rebase to the master branch? There are some CI test fix on master branch. |
Done! |
|
@codelipenghui rebase done, I'll make sure to use a new branch next time. |
|
/pulsarbot run-failure-checks |
|
@avimas Could you please take a look at the failed CI tests? look related to this PR. |
|
move to 2.7.0 first. |
…SizeOfMessages from the BatchReceive policy
…calling internalBatchReceiveAsync
|
/pulsarbot run-failure-checks |
1 similar comment
|
/pulsarbot run-failure-checks |
…SizeOfMessages (#6865) Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy. 1. Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy. I think this issue can be easly fixed by changing the canAdd function in MessagesImpl from: ``` protected boolean canAdd(Message<T> message) { if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages <= 0L) { return true; } else { return this.maxNumberOfMessages > 0 && this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages || this.maxSizeOfMessages > 0L && this.currentSizeOfMessages + (long)message.getData().length <= this.maxSizeOfMessages; } } ``` to (changing the condintion in the else to && instead of ||): ``` protected boolean canAdd(Message<T> message) { if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages <= 0L) { return true; } else { return (this.maxNumberOfMessages > 0 && this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages) && (this.maxSizeOfMessages > 0L && this.currentSizeOfMessages + (long)message.getData().length <= this.maxSizeOfMessages); } } ``` 2. When the batch size is higher than the recieveQ of the consumer (I used a batch size of 3000 and a receiveQ of 500) I noticed the following issues: a. In a mutliTopic (pattern) consumer the client stops receiving any messages I think it getting paused and never resumed when setting a timeout in the batch policy, only one batch is fetched and the client never resumed. talked with @codelipenghui and advised to open this pull request (cherry picked from commit 941ddd6)
…SizeOfMessages (apache#6865) Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy. 1. Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy. I think this issue can be easly fixed by changing the canAdd function in MessagesImpl from: ``` protected boolean canAdd(Message<T> message) { if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages <= 0L) { return true; } else { return this.maxNumberOfMessages > 0 && this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages || this.maxSizeOfMessages > 0L && this.currentSizeOfMessages + (long)message.getData().length <= this.maxSizeOfMessages; } } ``` to (changing the condintion in the else to && instead of ||): ``` protected boolean canAdd(Message<T> message) { if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages <= 0L) { return true; } else { return (this.maxNumberOfMessages > 0 && this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages) && (this.maxSizeOfMessages > 0L && this.currentSizeOfMessages + (long)message.getData().length <= this.maxSizeOfMessages); } } ``` 2. When the batch size is higher than the recieveQ of the consumer (I used a batch size of 3000 and a receiveQ of 500) I noticed the following issues: a. In a mutliTopic (pattern) consumer the client stops receiving any messages I think it getting paused and never resumed when setting a timeout in the batch policy, only one batch is fetched and the client never resumed. talked with @codelipenghui and advised to open this pull request
Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy.
I think this issue can be easly fixed by changing the canAdd function in MessagesImpl from:
to (changing the condintion in the else to && instead of ||):
a. In a mutliTopic (pattern) consumer the client stops receiving any messages I think it getting paused and never resumed when setting a timeout in the batch policy, only one batch is fetched and the client never resumed.
talked with @codelipenghui and advised to open this pull request