Skip to content

KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue#11337

Merged
ableegoldman merged 3 commits intoapache:trunkfrom
showuon:KAFKA-13309
Sep 29, 2021
Merged

KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue#11337
ableegoldman merged 3 commits intoapache:trunkfrom
showuon:KAFKA-13309

Conversation

@showuon
Copy link
Copy Markdown
Member

@showuon showuon commented Sep 18, 2021

JIRA is here.
In #9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.

For example:
We have a session window inactivity gap with 10 ms, and the records:

key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)
key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)
key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)
key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)

So, when fetch("A" /key from/, "D" /key to/), we expected to have [A, B, C, D], but we'll have [C, B A, D ]

And the reason is that we pass "false" in the "is forward" parameter for fetch method, and "true" for "backwardFetch" method, which obviously is wrong.

So, why does the tests can't find this issue?
It's because the test data we provided doesn't have multiple data in the same session window.

In this PR, I fixed the issue, and add tests to improve the test coverage.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)



return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false);
return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: to make fetch method passing true for isFarwarded.


return registerNewIterator(
keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true);
keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: to make backwardFetch method passing false for isFarwarded.

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L)
) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this PR, we only verify the returned data contains in the expected data as Set. Now, we'll change to List, which means, not only the data should be correct, but also the data order should be correct.

final List<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L));
Copy link
Copy Markdown
Member Author

@showuon showuon Sep 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a record with the same SessionWindow(100, 100). So we can test if the fetch/backwardFetch can return data in correct order for both InMemorySessionStore and RocksDBSessionStore.

Comment on lines -466 to +475
final Set<String> expectedKey1 = new HashSet<>(asList("1", "4", "7"));
assertThat(valuesToSet(sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1));
final Set<String> expectedKey2 = new HashSet<>(asList("2", "5", "8"));
assertThat(valuesToSet(sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2));
final Set<String> expectedKey3 = new HashSet<>(asList("3", "6", "9"));
assertThat(valuesToSet(sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3));
final List<String> expectedKey1 = asList("1", "4", "7");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1)));
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We forgot to close the KeyValueIterator instance returned by the findSessions. Fix it.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Sep 18, 2021

@jeqo @ableegoldman @vvcephei , please help review this PR. Thank you.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Sep 27, 2021

@jeqo @ableegoldman @vvcephei @guozhangwang , please help review this PR when available. Thank you.

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! LGTM

@ableegoldman ableegoldman merged commit 361b784 into apache:trunk Sep 29, 2021
ableegoldman pushed a commit that referenced this pull request Sep 29, 2021
…#11337)

In #9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
@ableegoldman
Copy link
Copy Markdown
Member

Merged to trunk and cherrypicked to 3.0

ableegoldman added a commit that referenced this pull request Sep 29, 2021
)

When cherrypicking #11337 back to the 3.0 branch and resolving a minor merge conflict, I forgot to git add the removal of some imports that were breaking checkstyle.

Reviewers: Natea Eshetu Beshada <nbeshada@confluent.io>, Walker Carlson <wcarlson@confluent.io>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…apache#11337)

In apache#9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…apache#11337)

In apache#9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…che#11365)

When cherrypicking apache#11337 back to the 3.0 branch and resolving a minor merge conflict, I forgot to git add the removal of some imports that were breaking checkstyle.

Reviewers: Natea Eshetu Beshada <nbeshada@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants