Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, fina
removeExpiredSegments();


return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false);
return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: to make fetch method passing true for isFarwarded.

}

@Override
Expand All @@ -292,7 +292,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFr
removeExpiredSegments();

return registerNewIterator(
keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true);
keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: to make backwardFetch method passing false for isFarwarded.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.toSet;
import static org.apache.kafka.common.utils.Utils.toList;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
Expand Down Expand Up @@ -121,15 +121,15 @@ public void shouldPutAndFindSessionsInRange() {

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L)
) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this PR, we only verify the returned data contains in the expected data as Set. Now, we'll change to List, which means, not only the data should be correct, but also the data order should be correct.

}

final List<KeyValue<Windowed<String>, Long>> expected2 =
Collections.singletonList(KeyValue.pair(a2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> values2 = sessionStore.findSessions(key, 400L, 600L)
) {
assertEquals(new HashSet<>(expected2), toSet(values2));
assertEquals(expected2, toList(values2));
}
}

Expand All @@ -143,28 +143,29 @@ public void shouldPutAndBackwardFindSessionsInRange() {
sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L);
sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L);

final List<KeyValue<Windowed<String>, Long>> expected =
asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(a1, 1L));
expected.add(KeyValue.pair(a2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFindSessions(key, 0, 1000L)) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(toList(expected.descendingIterator()), toList(values));
}

final List<KeyValue<Windowed<String>, Long>> expected2 =
Collections.singletonList(KeyValue.pair(a2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> values2 = sessionStore.backwardFindSessions(key, 400L, 600L)) {
assertEquals(new HashSet<>(expected2), toSet(values2));
assertEquals(expected2, toList(values2));
}
}

@Test
public void shouldFetchAllSessionsWithSameRecordKey() {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -174,18 +175,17 @@ public void shouldFetchAllSessionsWithSameRecordKey() {
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}
}

@Test
public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)
);
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -195,18 +195,18 @@ public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(toList(expected.descendingIterator()), toList(values));
}
}

@Test
public void shouldFetchAllSessionsWithinKeyRange() {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L),

KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));
final List<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L));
Copy link
Copy Markdown
Member Author

@showuon showuon Sep 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a record with the same SessionWindow(100, 100). So we can test if the fetch/backwardFetch can return data in correct order for both InMemorySessionStore and RocksDBSessionStore.

expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L));
expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -217,19 +217,22 @@ public void shouldFetchAllSessionsWithinKeyRange() {
sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("aa", "bb")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions("aa", "bb", 0L, Long.MAX_VALUE)) {
assertEquals(expected, toList(values));
}
}

@Test
public void shouldBackwardFetchAllSessionsWithinKeyRange() {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L),

KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)
);
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L));
expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L));
expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -240,7 +243,11 @@ public void shouldBackwardFetchAllSessionsWithinKeyRange() {
sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("aa", "bb")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(toList(expected.descendingIterator()), toList(values));
}

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFindSessions("aa", "bb", 0L, Long.MAX_VALUE)) {
assertEquals(toList(expected.descendingIterator()), toList(values));
}
}

Expand Down Expand Up @@ -272,7 +279,7 @@ public void shouldFindValuesWithinMergingSessionWindowRange() {
KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions(key, -1, 1000L)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(expected, toList(results));
}
}

Expand All @@ -282,13 +289,12 @@ public void shouldBackwardFindValuesWithinMergingSessionWindowRange() {
sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L);
sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L);

final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L),
KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)
);
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L));
expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.backwardFindSessions(key, -1, 1000L)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(toList(expected.descendingIterator()), toList(results));
}
}

Expand Down Expand Up @@ -341,7 +347,7 @@ public void shouldFindSessionsToMerge() {
Arrays.asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions("a", 150, 300)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(expected, toList(results));
}
}

Expand All @@ -359,10 +365,10 @@ public void shouldBackwardFindSessionsToMerge() {
sessionStore.put(session5, 5L);

final List<KeyValue<Windowed<String>, Long>> expected =
asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L));
asList(KeyValue.pair(session3, 3L), KeyValue.pair(session2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.backwardFindSessions("a", 150, 300)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(expected, toList(results));
}
}

Expand Down Expand Up @@ -400,7 +406,7 @@ public void shouldFetchExactKeys() {
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", "aa", 10, 0)
) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L))));
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L))));
}
}

Expand Down Expand Up @@ -438,7 +444,7 @@ public void shouldBackwardFetchExactKeys() {
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.backwardFindSessions("a", "aa", 10, 0)
) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L))));
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L))));
}
}

Expand All @@ -463,12 +469,20 @@ public void shouldFetchAndIterateOverExactBinaryKeys() {
sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8");
sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9");

final Set<String> expectedKey1 = new HashSet<>(asList("1", "4", "7"));
assertThat(valuesToSet(sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1));
final Set<String> expectedKey2 = new HashSet<>(asList("2", "5", "8"));
assertThat(valuesToSet(sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2));
final Set<String> expectedKey3 = new HashSet<>(asList("3", "6", "9"));
assertThat(valuesToSet(sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3));
final List<String> expectedKey1 = asList("1", "4", "7");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1)));
}
Comment on lines -466 to +475
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We forgot to close the KeyValueIterator instance returned by the findSessions. Fix it.


final List<String> expectedKey2 = asList("2", "5", "8");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2)));
}

final List<String> expectedKey3 = asList("3", "6", "9");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3)));
}

sessionStore.close();
}
Expand All @@ -494,12 +508,21 @@ public void shouldBackwardFetchAndIterateOverExactBinaryKeys() {
sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8");
sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9");

final Set<String> expectedKey1 = new HashSet<>(asList("1", "4", "7"));
assertThat(valuesToSet(sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1));
final Set<String> expectedKey2 = new HashSet<>(asList("2", "5", "8"));
assertThat(valuesToSet(sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2));
final Set<String> expectedKey3 = new HashSet<>(asList("3", "6", "9"));
assertThat(valuesToSet(sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3));

final List<String> expectedKey1 = asList("7", "4", "1");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1)));
}

final List<String> expectedKey2 = asList("8", "5", "2");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2)));
}

final List<String> expectedKey3 = asList("9", "6", "3");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3)));
}

sessionStore.close();
}
Expand Down Expand Up @@ -550,13 +573,13 @@ public void shouldRestore() {
}

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}

sessionStore.close();

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(Collections.emptySet(), toSet(values));
assertEquals(Collections.emptyList(), toList(values));
}


Expand All @@ -568,7 +591,7 @@ public void shouldRestore() {
context.restore(sessionStore.name(), changeLog);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void shouldRemove() {
cachingStore.remove(a);

try (final KeyValueIterator<Windowed<Bytes>, byte[]> rangeIter =
cachingStore.findSessions(keyA, 0, 0)) {
cachingStore.findSessions(keyA, 0, 0)) {
assertFalse(rangeIter.hasNext());

assertNull(cachingStore.fetchSession(keyA, 0, 0));
Expand Down
Loading