Skip to content

Conversation

@BewareMyPower
Copy link
Contributor

Motivation

Support key based batching for C++ client. This is a catch up work on #4435.

In addition, currently the implementation of BatchMessageContainer is coupling to ProducerImpl tightly. The batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its add method could call sendMessage to send batch to producer's pending queue. These should be producer's work.

Modifications

  • Add a MessageAndCallbackBatch to store a MessageImpl of serialized single messages and a callback list.
  • Add a BatchMessageContainerBase to provide interface methods and methods like update/clear message number/bytes, create OpSendMsg.
  • Let ProducerImpl manage the batch timer and determine whether to create OpSendMsg from BatchMessageContainerBase and send.
  • Make BatchMessageContainer inherit BatchMessageContainerBase, it only manages a MessageAndCallbackBatch.
  • Add a BatchMessageKeyBasedContainer that inherits BatchMessageContainerBase, it manages a map of message key and MessageAndCallbackBatch.
  • Add a producer config to change batching type.
  • Add some units tests for key based batching and a unit test for key shared subscription.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Current tests affected by default batching type, like BatchMessageTest.* and BasicEndToEndTest.*
  • Newly added tests: KeyBasedBatchingTest.* and KeySharedConsumerTest.testKeyBasedBatching

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@jiazhai jiazhai merged commit e7c4074 into apache:master Sep 9, 2020
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Oct 3, 2020
### Motivation

Support key based batching for C++ client. This is a catch up work on apache#4435.

In addition, currently the implementation of `BatchMessageContainer` is coupling to `ProducerImpl` tightly. The  batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its `add` method could call `sendMessage` to send batch to producer's pending queue. These should be producer's work.

### Modifications

- Add a `MessageAndCallbackBatch` to store a `MessageImpl` of serialized single messages and a callback list.
- Add a `BatchMessageContainerBase` to provide interface methods and methods like update/clear message number/bytes, create `OpSendMsg`.
- Let `ProducerImpl` manage the batch timer and determine whether to create `OpSendMsg` from `BatchMessageContainerBase` and send.
- Make `BatchMessageContainer` inherit `BatchMessageContainerBase`, it only manages a `MessageAndCallbackBatch`.
- Add a `BatchMessageKeyBasedContainer` that inherits `BatchMessageContainerBase`, it manages a map of message key and `MessageAndCallbackBatch`.
- Add a producer config to change batching type.
- Add some units tests for key based batching and a unit test for key shared subscription.

### Verifying this change
This change added tests and can be verified as follows: 

  - Current tests affected by default batching type, like `BatchMessageTest.*` and `BasicEndToEndTest.*`
  - Newly added tests: `KeyBasedBatchingTest.*` and `KeySharedConsumerTest.testKeyBasedBatching`



* Change ClientConnection::getMaxMessageSize() to static method

* Refactor the BatchMessageContainer

* Add batching type to producer config

* Fix testBigMessageSizeBatching error

* Add key based batching container

* Make batching type config work

* Add unit tests for key based batching and key shared subscription

* Fix flush test error possibly caused by timeout overflow

* Make BatchMessageKeyBasedContainer work for a single batch
@wolfstudy wolfstudy added this to the 2.7.0 milestone Oct 28, 2020
@wolfstudy
Copy link
Member

Move this change to 2.6.2, because #8191 depends on it.

merlimat pushed a commit to merlimat/pulsar that referenced this pull request Dec 19, 2020
### Motivation

Support key based batching for C++ client. This is a catch up work on apache#4435.

In addition, currently the implementation of `BatchMessageContainer` is coupling to `ProducerImpl` tightly. The  batch message container registers a timer to producer's executor and the timeout callback is also producer's method. Even its `add` method could call `sendMessage` to send batch to producer's pending queue. These should be producer's work.

### Modifications

- Add a `MessageAndCallbackBatch` to store a `MessageImpl` of serialized single messages and a callback list.
- Add a `BatchMessageContainerBase` to provide interface methods and methods like update/clear message number/bytes, create `OpSendMsg`.
- Let `ProducerImpl` manage the batch timer and determine whether to create `OpSendMsg` from `BatchMessageContainerBase` and send.
- Make `BatchMessageContainer` inherit `BatchMessageContainerBase`, it only manages a `MessageAndCallbackBatch`.
- Add a `BatchMessageKeyBasedContainer` that inherits `BatchMessageContainerBase`, it manages a map of message key and `MessageAndCallbackBatch`.
- Add a producer config to change batching type.
- Add some units tests for key based batching and a unit test for key shared subscription.

### Verifying this change
This change added tests and can be verified as follows: 

  - Current tests affected by default batching type, like `BatchMessageTest.*` and `BasicEndToEndTest.*`
  - Newly added tests: `KeyBasedBatchingTest.*` and `KeySharedConsumerTest.testKeyBasedBatching`



* Change ClientConnection::getMaxMessageSize() to static method

* Refactor the BatchMessageContainer

* Add batching type to producer config

* Fix testBigMessageSizeBatching error

* Add key based batching container

* Make batching type config work

* Add unit tests for key based batching and key shared subscription

* Fix flush test error possibly caused by timeout overflow

* Make BatchMessageKeyBasedContainer work for a single batch
@BewareMyPower BewareMyPower deleted the key-based-batch-dev branch August 24, 2021 13:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants