Skip to content

Conversation

@merlimat
Copy link
Contributor

Motivation

Currently the seek() operation will first disconnect all the connected consumers and then reset the cursor. This was due initially to simplify the logic and avoid concurrent operations.

Adding it now, so that consumers won't get disconnected/reconnected while the seek is performed.

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Sep 25, 2019
@merlimat merlimat added this to the 2.5.0 milestone Sep 25, 2019
@merlimat merlimat self-assigned this Sep 25, 2019
@codelipenghui
Copy link
Contributor

@merlimat I have check the logic but not found how to handle that reset cursor to a historical position and then consumer call acknowledgeCumulative() with messageId which is dispatched to this consumer before. If disconnect all consumers while reset cursor, client will create new consumers, that's not going to happen. Maybe i missed something.

@codelipenghui
Copy link
Contributor

retest this please

@codelipenghui
Copy link
Contributor

run java8 tests

@sijie sijie requested a review from jiazhai October 24, 2019 09:13
@sijie
Copy link
Member

sijie commented Oct 24, 2019

@merlimat If we don't disconnect the consumers, how do consumers reset their state? e.g. pending queue, acknowledgement, and etc.

@merlimat
Copy link
Contributor Author

If we don't disconnect the consumers, how do consumers reset their state? e.g. pending queue, acknowledgement, and etc.

Consumers they won't reset, they just will see the messages restart from an earlier (or later) point in time. I don't believe that should affect the consumer logic since there will be no visible disruption from consumer perspective.

@merlimat
Copy link
Contributor Author

If disconnect all consumers while reset cursor, client will create new consumers, that's not going to happen

It can still happen, and actually it's more probable to happen the longer the ack is delayed from the receival of the message.

Even if the consumer session is recreated, a consumer can receive a message, then seek it's triggered and then it will call ack-cumulative returning the cursor to the previous position.

An application that seeks while acking (either individual or cumulative) is bound to unpredictable behavior in any case, irrespective of the disconnection.

@wolfstudy wolfstudy modified the milestones: 2.5.0, 2.4.2 Nov 6, 2019
@wolfstudy
Copy link
Member

@merlimat I changed the milestone to 2.4.2.

@jiazhai
Copy link
Member

jiazhai commented Nov 6, 2019

seems the test failures related with this change:

org.apache.pulsar.compaction.CompactionTest.testCompactorReadsCompacted
org.apache.pulsar.compaction.CompactionTest.testSeekEarliestAfterCompaction
org.apache.pulsar.compaction.CompactorTest.testCompactAddCompact

@jiazhai
Copy link
Member

jiazhai commented Nov 6, 2019

If we don't disconnect the consumers, how do consumers reset their state? e.g. pending queue, acknowledgement, and etc.

Consumers they won't reset, they just will see the messages restart from an earlier (or later) point in time. I don't believe that should affect the consumer logic since there will be no visible disruption from consumer perspective.

Maybe Sijie was concern about the messages that already cached in consumer client. We may need some handling to clean the cached messages?

@sijie
Copy link
Member

sijie commented Nov 6, 2019

@merlimat I wasn't sure how is the ordering guaranteed if we don't clean up the cached message. Can you clarify it?

@sijie
Copy link
Member

sijie commented Nov 8, 2019

Moved this to 2.5.0 since I think it requires some more discussions about the side effect of this change.

@sijie sijie modified the milestones: 2.4.2, 2.5.0, 2.4.3 Nov 8, 2019
@sijie
Copy link
Member

sijie commented Nov 8, 2019

actually moved to 2.4.3

@sijie
Copy link
Member

sijie commented Jan 24, 2020

@bsideup thank you for your updates. Do these N sessions belong to one subscription?

@bsideup
Copy link
Member

bsideup commented Jan 24, 2020

@sijie yes, they listen to the same topic / partition with failover

@sijie
Copy link
Member

sijie commented Jan 25, 2020

@bsideup sorry. if they are in the same subscription, it is expected that all consumers in the same subscription to reset to the new cursor position if the cursor is reset from a correctness perspective. I see what can be improved here is when the broker disconnects the consumers, it can disconnect the active consumers only. For exclusive/failover subscription, it will only reset the consumer that is currently active. This seems to be a correct fix. @codelipenghui @merlimat what do you think?

@codelipenghui
Copy link
Contributor

@merlimat I move it to 2.7.0 and feel free to move it back if you want to onboard it in 2.6.0.

@codelipenghui codelipenghui modified the milestones: 2.6.0, 2.7.0 May 19, 2020
@bsideup
Copy link
Member

bsideup commented May 19, 2020

@codelipenghui it is sad to see it being moved over and over 😞

It is a blocker for failover subscriptions with external offset management, and the implemented behaviour is rather questionable.

What's the reason to move it (again)?

@codelipenghui
Copy link
Contributor

@bsideup Thanks for your feedback, I didn't notice that it is a blocker before. I will move it back to 2.6.0 and could you please take a look at sijie's last comment? Is the approach works for you?

@codelipenghui
Copy link
Contributor

@bsideup I have pushed a PR #7141 which based on sijie's comment #5278 (comment). Could you please help review the changes?

@bsideup
Copy link
Member

bsideup commented Jun 2, 2020

@codelipenghui looks promising! Looking forward to trying it 👍

codelipenghui added a commit that referenced this pull request Jun 3, 2020
)

Related to #5278 

### Motivation

Only close active consumer for Failover subscription when seek().

### Verifying this change

Unit tests added

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (no)
@codelipenghui
Copy link
Contributor

@bsideup PR #7141 is merged. You can try out on the master branch. I will move this PR to 2.7.0 first since we need to prepare the 2.6.0 release.

@codelipenghui codelipenghui modified the milestones: 2.6.0, 2.7.0 Jun 4, 2020
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…ache#7141)

Related to apache#5278 

### Motivation

Only close active consumer for Failover subscription when seek().

### Verifying this change

Unit tests added

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (no)
@lhotari
Copy link
Member

lhotari commented Sep 14, 2020

PR #7141 is merged. You can try out on the master branch. I will move this PR to 2.7.0 first since we need to prepare the 2.6.0 release.

@codelipenghui PR #7141 doesn't seem to help with Pulsar Readers since Readers are always using exclusive subscriptions under the covers (

consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
). Since a Pulsar Reader wraps a Consumer internally, I was thinking of the possibility of using a NonDurable Consumer directly without the Reader "wrapper". However this approach fails because creating NonDurable Consumers remains broken, that I have reported as #7436.

@codelipenghui would it be possible to extend the solution that was made in #7141 to also cover Readers?
I'm having issues with using seekAsync + hasMessageAvailableAsync on Readers. It seems that there are race conditions because of the current solution that disconnects from the broker side after the seek. The usage of seek + hasMessageAvailableAsync in Readers isn't deterministic at all. There's a separate issue about the issues with hasMessageAvailabe after seek, that's #7796 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants