Skip to content

Conversation

@shibd
Copy link
Member

@shibd shibd commented Oct 7, 2022

Motivation

apache/pulsar#17140

This PR has been reviewed in pulsar repo.

Modifications

  • Consumer support batch receives messages.
  • Abstract common implementation to ConsumerImplBase.

Documentation

  • doc

Matching PR in forked repository

shibd#1

@shibd shibd changed the title [feat][cpp] Consumer support batch receive messages. [feat] Consumer support batch receive messages. Oct 7, 2022
@shibd
Copy link
Member Author

shibd commented Oct 8, 2022

This PR ci has can be passed. shibd#1

@merlimat merlimat added this to the 3.0.0 milestone Oct 9, 2022
Comment on lines 129 to 131
Lock lock(batchPendingReceiveMutex_);
notifyBatchPendingReceivedCallback(callback);
lock.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Lock lock(batchPendingReceiveMutex_);
notifyBatchPendingReceivedCallback(callback);
lock.unlock();
notifyBatchPendingReceivedCallback(callback);

IIUC, batchPendingReceiveMutex_ is used to make the accesses to batchPendingReceives_ thread safe. We don't need the lock here.

Copy link
Member Author

@shibd shibd Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. batchPendingReceiveMutex_ is only used to make the accesses to batchPendingReceives_ thread safe. I mixed it up.

Here need a lock to make hasEnoughMessagesForBatchReceive and notifyBatchPendingReceivedCallback is atomic, then prevents users receive not compliant with the policy messages.

I add a new batchReceiveOptionMutex_ lock to sync them.

But, This is not entirely guaranteed, Because I didn't synchronize the scenario where a single receive message is incomingQueue_ out of the queue.

In Java client impl, These operations are all performed in internalPinnedExecutor threads. So no problem.

I think it can be done this way first, and then I'll try to refactor it so that C++ also uses an internal thread to avoid adding various locks.

@merlimat merlimat modified the milestones: 3.0.0, 3.1.0 Oct 12, 2022
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found another problem.

@BewareMyPower BewareMyPower merged commit 7f7653b into apache:main Oct 18, 2022
merlimat pushed a commit to merlimat/pulsar-client-cpp that referenced this pull request Oct 20, 2022
### Motivation
apache/pulsar#17140

This PR has been reviewed in [pulsar repo](apache/pulsar#17429).

### Modifications
- Consumer support batch receives messages.
- Abstract common implementation to `ConsumerImplBase`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants