Skip to content

Conversation

@315157973
Copy link
Contributor

Fixes #7626

Motivation

Expose MessagesImpl ,so that we can ack list of messages by using CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)

Modifications

Change the visibility level of the method from protect to public

Verifying this change

unit test:
org.apache.pulsar.client.api.ConsumerBatchReceiveTest#testBatchAck

@merlimat
Copy link
Contributor

merlimat commented Jul 29, 2020

I don't understand the reason for this change. there's no performance improvement in this versus just acking the individual messages

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some restrictions when using the MessagesImpl, I think we can add new methods acknowledge(List<Message> msgs) and acknowledge(List<MessageIds> msgIds), for some users that they already have a message list for message ID list, they can simpler batch ack messages.

@pg2404
Copy link

pg2404 commented Jul 30, 2020

Thank you @codelipenghui.
@merlimat - there is a lot of scope of performance improvement in the scenario I described - where I am receiving a batch of messages, grouping them and processing them - and only on successful processing of the batch (or even partial success), would need to send ack - by the time I am in a position to send an ack/negative ack I have lost the ordering of message ids and cumulativeAck will not work (I might need some of the intermediate messages redelivered) - Right now I am having to group these messages and loop over these groups to send ack - with this change I can maintain the message Ids successfully processed and fire a single async call with the list.

@sijie
Copy link
Member

sijie commented Jul 30, 2020

@pg2404 I understand the motivation. However, I am not sure we are implementing this in the right way.

In your use case, can't you use batchReceive to receive a batch of messages and ack the whole batch?

If we want to enhance the API to support batch acknowledgment, doesn't it make more sense to support

ack(List<Message> msgs);
ack(List<Messages> msgBatches);

@pg2404
Copy link

pg2404 commented Jul 30, 2020

@sijie - I was only looking for suggestions to implement my use case really - any guidance would be helpful even if a new API is not exposed. My basic pain point being, even if I do a receive a batch - I cannot guarantee that the entire batch will fail or pass in the single shot - I have to group the data on certain metadata and divide the messages into smaller batches and write them somewhere - if the entire process succeeds for the entire batch only then can I send the ack. Suppose, out of the batch I have 4 sub groups and out of which 2 fails I cannot ack the batch - the ack has to be done per message.

@315157973
Copy link
Contributor Author

Requirements:
Receive a batch of messages (BatchA), but only ack some of them (BatchB), and only call one API to ack BatchB

Solution:
Change the visibility level of the method from protect to public,so that we can ack list of messages by using acknowledge(Messages<?> messages)

Suggestion:
1)Since the internal implementation of the acknowledge(Messages<?> messages) method is still a circular ack, merlimat believes that there is no performance improvement.
2)codelipenghui believes that acknowledge (Messages<?> messages) should not be reused, because Messages has restrictions, and acknowledge(List<MessageIds> msgIds) should be added。Although I prefer to use Messages。

Solution2:

  1. add method acknowledge(List<MessageId> msgIds)
  2. add batch individual acknowledgment to PersistentAcknowledgmentsGroupingTracker, ack a batch of messages only requires one request instead of circular requests

@merlimat @codelipenghui @sijie

@315157973
Copy link
Contributor Author

Please review @merlimat @sijie

@sijie sijie changed the title Expose MessagesImpl Support acknowledging a list of messages Aug 13, 2020
@sijie sijie added this to the 2.7.0 milestone Aug 13, 2020
@sijie sijie added the type/feature The PR added a new feature or issue requested a new feature label Aug 13, 2020
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit e06e872 into apache:master Aug 17, 2020
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
Fixes apache#7626


### Motivation
Expose `MessagesImpl` ,so that we can ack list of messages by using `CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)`


### Modifications
Change the visibility level of the method from protect to public

### Verifying this change
unit test:
org.apache.pulsar.client.api.ConsumerBatchReceiveTest#testBatchAck
@315157973 315157973 deleted the ackList branch September 3, 2020 15:52
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
Fixes apache#7626


### Motivation
Expose `MessagesImpl` ,so that we can ack list of messages by using `CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)`


### Modifications
Change the visibility level of the method from protect to public

### Verifying this change
unit test:
org.apache.pulsar.client.api.ConsumerBatchReceiveTest#testBatchAck
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
Fixes apache#7626


### Motivation
Expose `MessagesImpl` ,so that we can ack list of messages by using `CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)`


### Modifications
Change the visibility level of the method from protect to public

### Verifying this change
unit test:
org.apache.pulsar.client.api.ConsumerBatchReceiveTest#testBatchAck
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
Fixes apache#7626


### Motivation
Expose `MessagesImpl` ,so that we can ack list of messages by using `CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)`


### Modifications
Change the visibility level of the method from protect to public

### Verifying this change
unit test:
org.apache.pulsar.client.api.ConsumerBatchReceiveTest#testBatchAck
merlimat pushed a commit to merlimat/pulsar that referenced this pull request Dec 19, 2020
Fixes apache#7626


### Motivation
Expose `MessagesImpl` ,so that we can ack list of messages by using `CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)`


### Modifications
Change the visibility level of the method from protect to public

### Verifying this change
unit test:
org.apache.pulsar.client.api.ConsumerBatchReceiveTest#testBatchAck
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Acknowledging list of messages in Pulsar consumer java client

6 participants