Skip to content

Conversation

@jerrypeng
Copy link
Contributor

Same as #9316 but with additional fixes

@jerrypeng jerrypeng added type/bug The PR fixed a bug or issue reported a bug area/client java release/2.7.1 labels Feb 20, 2021
@jerrypeng jerrypeng added this to the 2.8.0 milestone Feb 20, 2021
@jerrypeng jerrypeng requested a review from merlimat February 20, 2021 05:30
@jerrypeng jerrypeng self-assigned this Feb 20, 2021
@merlimat merlimat merged commit e017b10 into apache:master Feb 20, 2021
zymap pushed a commit that referenced this pull request Mar 1, 2021
* Fixed hasMessageAvailable() with empty topic

* Fixed test

* adding additional fixes

* fix comment

Co-authored-by: Matteo Merli <mmerli@apache.org>
Co-authored-by: Jerry Peng <jerryp@splunk.com>
(cherry picked from commit e017b10)
@zymap zymap added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Mar 1, 2021
zymap added a commit to zymap/pulsar that referenced this pull request Mar 4, 2021
zymap added a commit to zymap/pulsar that referenced this pull request Mar 4, 2021
---

Master Issue: #<xyz>

*Motivation*

PR apache#9652 fix the issue at master but it's base on the master
code. In the master code, we change the generate proto way and
it's difficult to cherry-pick that PR into the branch 2.7. So
I open a new PR to fix that issue at branch2.7.
zymap added a commit that referenced this pull request Mar 4, 2021
---

Master Issue: #<xyz>

*Motivation*

PR #9652 fix the issue at master but it's base on the master
code. In the master code, we change the generate proto way and
it's difficult to cherry-pick that PR into the branch 2.7. So
I open a new PR to fix that issue at branch2.7.
zymap added a commit to streamnative/pulsar-archived that referenced this pull request Mar 5, 2021
zymap added a commit to streamnative/pulsar-archived that referenced this pull request Mar 5, 2021
---

Master Issue: #<xyz>

*Motivation*

PR apache#9652 fix the issue at master but it's base on the master
code. In the master code, we change the generate proto way and
it's difficult to cherry-pick that PR into the branch 2.7. So
I open a new PR to fix that issue at branch2.7.
BewareMyPower added a commit that referenced this pull request Jan 24, 2022
…13883)

### Motivation

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

### Modifications

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

### Verifying this change

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.
codelipenghui pushed a commit that referenced this pull request Jan 27, 2022
…13883)

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)
BewareMyPower added a commit that referenced this pull request Jan 27, 2022
…13883)

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)

Fix the conflicts by:
- Remove ReaderImpl::getLastMessageIdAsync introduced from #11723
- Remove getPriorityLevel() method introduced from #12076
- Revert changes of registerConsumer from #12118
- Remove new fields introduced from #13627
BewareMyPower added a commit that referenced this pull request Jan 27, 2022
…13883)

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)

Fix the conflicts by
- Remove new fields introduced from #13627
BewareMyPower added a commit that referenced this pull request Apr 2, 2022
…13883)

### Motivation

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

### Modifications

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

### Verifying this change

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Mar 26, 2024
…fter seeking by timestamp

### Motivation

After seeking by timestamp, `hasMessageAvailable()` and `readNext()`
could return wrong values.

The 1st bug was a regression introduced by
apache#22201, which modifies
`startMessageId` to `seekMessageId` before a seek operation is done.
However, the previous behavior is also a bug but accidentally works in
this case. When seeking by timestamp, the `seekMessageId` is modified to
`earliest`, which should not be compared with `lastMessageIdInBroker`
because the actual start position is determined by the seek timestamp,
not the `earliest` message id.

The 2nd bug was caused by apache#9652,
when `startMessageIdInclusive()` is configured to create a reader, it
could seek to the position of the latest message when
`lastDequeuedMessageId` is `earliest` and `startMessageId` is `latest`.

### Modifications

Add a boolean flag `hasSoughtByTimestamp` to represent whether the last
seek call accepts a timestamp. If it's true, don't take `startMessageId`
into comparison with `lastMessageIdInBroker`, just compare the
mark-delete position and last position in the GetLastMessageId response.

Add `testHasMessageAvailableAfterSeekTimestamp` to verify the change.

For the `readNext` call, if the reader has sought by timestamp, don't
seek to the latest position in `hasMessageAvailable`. Modify
`testReadMessageWithBatchingWithMessageInclusive` to verify the fix.
However, this patch does not modify the existing behavior when `seek` is
not called because the inclusive reader relies on the implicit seek
operation in `hasMessageAvailable`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/client cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.1 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants