From 45db1f68df95d09a5ff28471473a2fbb7d2613cd Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 18 Aug 2021 20:58:57 +0800 Subject: [PATCH 1/2] KAFKA-13212: add support infinite query for session store --- .../streams/state/ReadOnlySessionStore.java | 17 ++- .../AbstractRocksDBSegmentedBytesStore.java | 6 +- .../state/internals/CachingSessionStore.java | 30 ++-- .../CompositeReadOnlySessionStore.java | 8 -- .../state/internals/InMemorySessionStore.java | 56 ++++---- .../state/internals/MeteredSessionStore.java | 16 +-- .../state/internals/SegmentIterator.java | 2 +- .../AbstractSessionBytesStoreTest.java | 129 +++++++++++++----- .../CachingInMemorySessionStoreTest.java | 128 +++++++++++++---- .../CachingPersistentSessionStoreTest.java | 128 ++++++++++++++--- .../CompositeReadOnlySessionStoreTest.java | 45 +++++- .../kafka/test/ReadOnlySessionStoreStub.java | 31 ++++- 12 files changed, 423 insertions(+), 173 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 0ade24286fdd6..7886c989b958c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -126,14 +126,15 @@ default KeyValueIterator, AGG> backwardFindSessions(final K key, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range + * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration starts. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration ends. * @return iterator of sessions with the matching keys and aggregated values, from earliest to * latest session time. - * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, @@ -151,14 +152,15 @@ default KeyValueIterator, AGG> findSessions(final K keyFrom, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range + * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration starts. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration ends. * @return iterator of sessions with the matching keys and aggregated values, from earliest to * latest session time. - * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, @@ -176,14 +178,15 @@ default KeyValueIterator, AGG> findSessions(final K keyFrom, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range + * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration ends. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration starts. * @return backward iterator of sessions with the matching keys and aggregated values, from * latest to earliest session time. - * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, final K keyTo, @@ -201,14 +204,15 @@ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range + * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration ends. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration starts. * @return backward iterator of sessions with the matching keys and aggregated values, from * latest to earliest session time. - * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, final K keyTo, @@ -289,10 +293,11 @@ default KeyValueIterator, AGG> backwardFetch(final K key) { * available session to the newest/latest session. * * @param keyFrom first key in the range to find aggregated session values for + * A null value indicates a starting position from the first element in the store. * @param keyTo last key in the range to find aggregated session values for + * A null value indicates that the range ends with the last element in the store. * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest * session. - * @throws NullPointerException If null is used for any of the keys. */ KeyValueIterator, AGG> fetch(final K keyFrom, final K keyTo); @@ -304,7 +309,9 @@ default KeyValueIterator, AGG> backwardFetch(final K key) { * available session to the oldest/earliest session. * * @param keyFrom first key in the range to find aggregated session values for + * A null value indicates a starting position from the first element in the store. * @param keyTo last key in the range to find aggregated session values for + * A null value indicates that the range ends with the last element in the store. * @return backward KeyValueIterator containing all sessions for the provided key, from newest * to oldest session. * @throws NullPointerException If null is used for any of the keys. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index f7aef116a91ca..bfee6b2754a8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -114,7 +114,7 @@ KeyValueIterator fetch(final Bytes keyFrom, final long from, final long to, final boolean forward) { - if (keyFrom.compareTo(keyTo) > 0) { + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -124,8 +124,8 @@ KeyValueIterator fetch(final Bytes keyFrom, final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); - final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); - final Bytes binaryTo = keySchema.upperRange(keyTo, to); + final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); + final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); return new SegmentIterator<>( searchSpace.iterator(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 1cfb8ce498a64..9b07fe82e6d9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -47,6 +47,10 @@ class CachingSessionStore private final SessionKeySchema keySchema; private final SegmentedCacheFunction cacheFunction; + private static final String INVALID_RANGE_WARN_MSG = "Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"; private String cacheName; private InternalProcessorContext context; @@ -212,18 +216,15 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + - "This may be due to range arguments set in the wrong order, " + - "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + - "Note that the built-in numerical serdes do not follow this for negative numbers"); + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { + LOG.warn(INVALID_RANGE_WARN_MSG); return KeyValueIterators.emptyIterator(); } validateStoreOpen(); - final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); - final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); + final Bytes cacheKeyFrom = keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); + final Bytes cacheKeyTo = keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); final KeyValueIterator, byte[]> storeIterator = wrapped().findSessions( @@ -243,18 +244,15 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + - "This may be due to range arguments set in the wrong order, " + - "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + - "Note that the built-in numerical serdes do not follow this for negative numbers"); + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { + LOG.warn(INVALID_RANGE_WARN_MSG); return KeyValueIterators.emptyIterator(); } validateStoreOpen(); - final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); - final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); + final Bytes cacheKeyFrom = keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); + final Bytes cacheKeyTo = keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); final KeyValueIterator, byte[]> storeIterator = @@ -304,16 +302,12 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) @Override public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index fb5fb61ae6108..0f153cc4bf545 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -101,8 +101,6 @@ public KeyValueIterator, V> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - Objects.requireNonNull(keyFrom, "from can't be null"); - Objects.requireNonNull(keyTo, "to can't be null"); final List> stores = storeProvider.stores(storeName, queryableStoreType); for (final ReadOnlySessionStore store : stores) { try { @@ -130,8 +128,6 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - Objects.requireNonNull(keyFrom, "from can't be null"); - Objects.requireNonNull(keyTo, "to can't be null"); final List> stores = storeProvider.stores(storeName, queryableStoreType); for (final ReadOnlySessionStore store : stores) { try { @@ -221,8 +217,6 @@ public KeyValueIterator, V> backwardFetch(final K key) { @Override public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "keyFrom can't be null"); - Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>(storeName, @@ -233,8 +227,6 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { @Override public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "keyFrom can't be null"); - Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 722ed43ff0cae..7785ee808395c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -54,6 +54,12 @@ public class InMemorySessionStore implements SessionStore { private final long retentionPeriod; + private final static String INVALID_RANGE_WARN_MSG = + "Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"; + private final ConcurrentNavigableMap>> endTimeMap = new ConcurrentSkipListMap<>(); private final Set openIterators = ConcurrentHashMap.newKeySet(); @@ -205,16 +211,10 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - Objects.requireNonNull(keyFrom, "from key cannot be null"); - Objects.requireNonNull(keyTo, "to key cannot be null"); - removeExpiredSegments(); - if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + - "This may be due to range arguments set in the wrong order, " + - "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + - "Note that the built-in numerical serdes do not follow this for negative numbers"); + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { + LOG.warn(INVALID_RANGE_WARN_MSG); return KeyValueIterators.emptyIterator(); } @@ -230,16 +230,10 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - Objects.requireNonNull(keyFrom, "from key cannot be null"); - Objects.requireNonNull(keyTo, "to key cannot be null"); - removeExpiredSegments(); - if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + - "This may be due to range arguments set in the wrong order, " + - "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + - "Note that the built-in numerical serdes do not follow this for negative numbers"); + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { + LOG.warn(INVALID_RANGE_WARN_MSG); return KeyValueIterators.emptyIterator(); } @@ -275,20 +269,13 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) @Override public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(keyFrom, "from key cannot be null"); - Objects.requireNonNull(keyTo, "to key cannot be null"); - removeExpiredSegments(); - return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); } @Override public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(keyFrom, "from key cannot be null"); - Objects.requireNonNull(keyTo, "to key cannot be null"); - removeExpiredSegments(); return registerNewIterator( @@ -457,17 +444,22 @@ private void setAllIterators() { while (endTimeIterator.hasNext()) { final Entry>> nextEndTimeEntry = endTimeIterator.next(); currentEndTime = nextEndTimeEntry.getKey(); + + final ConcurrentNavigableMap> subKVMap; + if (keyFrom == null && keyTo == null) { + subKVMap = nextEndTimeEntry.getValue(); + } else if (keyFrom == null) { + subKVMap = nextEndTimeEntry.getValue().headMap(keyTo, true); + } else if (keyTo == null) { + subKVMap = nextEndTimeEntry.getValue().tailMap(keyFrom, true); + } else { + subKVMap = nextEndTimeEntry.getValue().subMap(keyFrom, true, keyTo, true); + } + if (forward) { - keyIterator = nextEndTimeEntry.getValue() - .subMap(keyFrom, true, keyTo, true) - .entrySet() - .iterator(); + keyIterator = subKVMap.entrySet().iterator(); } else { - keyIterator = nextEndTimeEntry.getValue() - .subMap(keyFrom, true, keyTo, true) - .descendingMap() - .entrySet() - .iterator(); + keyIterator = subKVMap.descendingMap().entrySet().iterator(); } if (setInnerIterators()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index f7178999b0fd5..529f6165ca7ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -265,8 +265,6 @@ public KeyValueIterator, V> backwardFetch(final K key) { @Override public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, @@ -278,8 +276,6 @@ public KeyValueIterator, V> fetch(final K keyFrom, @Override public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, @@ -330,10 +326,8 @@ public KeyValueIterator, V> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); - final Bytes bytesKeyFrom = keyBytes(keyFrom); - final Bytes bytesKeyTo = keyBytes(keyTo); + final Bytes bytesKeyFrom = keyFrom == null ? null : keyBytes(keyFrom); + final Bytes bytesKeyTo = keyTo == null ? null : keyBytes(keyTo); return new MeteredWindowedKeyValueIterator<>( wrapped().findSessions( bytesKeyFrom, @@ -351,10 +345,8 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); - final Bytes bytesKeyFrom = keyBytes(keyFrom); - final Bytes bytesKeyTo = keyBytes(keyTo); + final Bytes bytesKeyFrom = keyFrom == null ? null : keyBytes(keyFrom); + final Bytes bytesKeyTo = keyTo == null ? null : keyBytes(keyTo); return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFindSessions( bytesKeyFrom, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index 03b66a634f2db..d84a1b958b14c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -74,7 +74,7 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { - if (from == null || to == null) { + if (from == null && to == null) { if (forward) { currentIterator = currentSegment.all(); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index ba203bdff093a..5f3ad61b1f95f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -201,47 +201,90 @@ public void shouldBackwardFetchAllSessionsWithSameRecordKey() { @Test public void shouldFetchAllSessionsWithinKeyRange() { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), - - KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); + final List, Long>> expected = new ArrayList<>(); + expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); + expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); } - // add some that shouldn't appear in the results + // add some that should only be fetched in unlimited fetch sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); try (final KeyValueIterator, Long> values = sessionStore.fetch("aa", "bb")) { assertEquals(new HashSet<>(expected), toSet(values)); } + + // unlimited keyFrom fetch case + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + + try (final KeyValueIterator, Long> values = sessionStore.fetch(null, "bb")) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + + // remove the one added for unlimited start fetch case + expected.remove(expected.size() - 1); + // unlimited keyTo fetch case + expected.add(KeyValue.pair(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L)); + + try (final KeyValueIterator, Long> values = sessionStore.fetch("aa", null)) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + + // fetch all case + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + + try (final KeyValueIterator, Long> values = sessionStore.fetch(null, null)) { + assertEquals(new HashSet<>(expected), toSet(values)); + } } @Test public void shouldBackwardFetchAllSessionsWithinKeyRange() { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), - - KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L) - ); + final List, Long>> expected = new ArrayList<>(); + expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); + expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); } - // add some that shouldn't appear in the results + // add some that should only be fetched in unlimited fetch sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("aa", "bb")) { assertEquals(new HashSet<>(expected), toSet(values)); } + + // unlimited keyFrom fetch case + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + + try (final KeyValueIterator, Long> values = sessionStore.backwardFetch(null, "bb")) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + + // remove the one added for unlimited start fetch case + expected.remove(expected.size() - 1); + // unlimited keyTo fetch case + expected.add(KeyValue.pair(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L)); + + try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("aa", null)) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + + // fetch all case + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + + try (final KeyValueIterator, Long> values = sessionStore.backwardFetch(null, null)) { + assertEquals(new HashSet<>(expected), toSet(values)); + } } @Test @@ -402,6 +445,24 @@ public void shouldFetchExactKeys() { ) { assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); } + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions(null, "aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("a", null, 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions(null, null, 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } } @Test @@ -440,6 +501,24 @@ public void shouldBackwardFetchExactKeys() { ) { assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); } + + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions(null, "aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions("a", null, 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions(null, null, 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } } @Test @@ -669,26 +748,6 @@ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { assertThrows(NullPointerException.class, () -> sessionStore.findSessions(null, 1L, 2L)); } - @Test - public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { - assertThrows(NullPointerException.class, () -> sessionStore.findSessions(null, "anyKeyTo", 1L, 2L)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { - assertThrows(NullPointerException.class, () -> sessionStore.findSessions("anyKeyFrom", null, 1L, 2L)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { - assertThrows(NullPointerException.class, () -> sessionStore.fetch(null, "anyToKey")); - } - - @Test - public void shouldThrowNullPointerExceptionOnFetchNullToKey() { - assertThrows(NullPointerException.class, () -> sessionStore.fetch("anyFromKey", null)); - } - @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> sessionStore.fetch(null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index 977504628762f..6dbdef836305b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -150,7 +150,31 @@ public void shouldPutFetchAllKeysFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> all = cachingStore.findSessions(keyA, keyB, 0, 0)) { + try (final KeyValueIterator, byte[]> all = cachingStore.fetch(keyA, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom fetch + try (final KeyValueIterator, byte[]> all = cachingStore.fetch(null, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyTo fetch + try (final KeyValueIterator, byte[]> all = cachingStore.fetch(null, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom and keyTo fetch + try (final KeyValueIterator, byte[]> all = cachingStore.fetch(null, keyB)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); @@ -166,7 +190,31 @@ public void shouldPutBackwardFetchAllKeysFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> all = cachingStore.backwardFindSessions(keyA, keyB, 0, 0)) { + try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(keyA, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom fetch + try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(null, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyTo fetch + try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(null, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom and keyTo fetch + try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(null, null)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); @@ -241,7 +289,33 @@ public void shouldPutFetchRangeFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> some = cachingStore.findSessions(keyAA, keyB, 0, 0)) { + try (final KeyValueIterator, byte[]> some = + cachingStore.findSessions(keyAA, keyB, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyFrom case + try (final KeyValueIterator, byte[]> some = + cachingStore.findSessions(null, keyAA, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.findSessions(keyAA, keyB, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyFrom and keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.findSessions(null, null, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); @@ -256,9 +330,35 @@ public void shouldPutBackwardFetchRangeFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> some = cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { + try (final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyFrom case + try (final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(null, keyAA, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyFrom and keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(null, null, 0, 0)) { verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); } } @@ -646,26 +746,6 @@ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); } - @Test - public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { - assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { - assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFetchNullToKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); - } - @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 9af329df538bc..cbf4331c39747 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -133,7 +133,34 @@ public void shouldPutFetchAllKeysFromCache() { assertEquals(3, cache.size()); try (final KeyValueIterator, byte[]> all = - cachingStore.findSessions(keyA, keyB, 0, 0)) { + cachingStore.fetch(keyA, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom fetch + try (final KeyValueIterator, byte[]> all = + cachingStore.fetch(null, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyTo fetch + try (final KeyValueIterator, byte[]> all = + cachingStore.fetch(keyA, null)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom and keyTo fetch + try (final KeyValueIterator, byte[]> all = + cachingStore.fetch(null, null)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); @@ -150,7 +177,34 @@ public void shouldPutBackwardFetchAllKeysFromCache() { assertEquals(3, cache.size()); try (final KeyValueIterator, byte[]> all = - cachingStore.backwardFindSessions(keyA, keyB, 0, 0)) { + cachingStore.backwardFetch(keyA, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom fetch + try (final KeyValueIterator, byte[]> all = + cachingStore.backwardFetch(null, keyB)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyTo fetch + try (final KeyValueIterator, byte[]> all = + cachingStore.backwardFetch(keyA, null)) { + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + // infinite keyFrom and keyTo fetch + try (final KeyValueIterator, byte[]> all = + cachingStore.backwardFetch(null, null)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); @@ -233,6 +287,31 @@ public void shouldPutFetchRangeFromCache() { verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); } + + // infinite keyFrom case + try (final KeyValueIterator, byte[]> some = + cachingStore.findSessions(null, keyAA, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.findSessions(keyAA, keyB, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyFrom and keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.findSessions(null, null, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } } @Test @@ -249,6 +328,31 @@ public void shouldPutBackwardFetchRangeFromCache() { verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); } + + // infinite keyFrom case + try (final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(null, keyAA, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + // infinite keyFrom and keyTo case + try (final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(null, null, 0, 0)) { + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } } @Test @@ -653,26 +757,6 @@ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); } - @Test - public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { - assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { - assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); - } - - @Test - public void shouldThrowNullPointerExceptionOnFetchNullToKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); - } - @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java index 66ae2431c5938..c2d38def4a91d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java @@ -146,21 +146,52 @@ public void shouldFetchKeyRangeAcrossStores() { underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch("a", "b")); - assertThat(results.size(), equalTo(2)); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); } @Test - public void shouldThrowNPEIfKeyIsNull() { - assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch(null)); + public void shouldFetchKeyRangeAcrossStoresWithNullKeyFrom() { + final ReadOnlySessionStoreStub secondUnderlying = new + ReadOnlySessionStoreStub<>(); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); + secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); + final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch(null, "b")); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); } @Test - public void shouldThrowNPEIfFromKeyIsNull() { - assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch(null, "a")); + public void shouldFetchKeyRangeAcrossStoresWithNullKeyTo() { + final ReadOnlySessionStoreStub secondUnderlying = new + ReadOnlySessionStoreStub<>(); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); + secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); + final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch("a", null)); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); } @Test - public void shouldThrowNPEIfToKeyIsNull() { - assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch("a", null)); + public void shouldFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() { + final ReadOnlySessionStoreStub secondUnderlying = new + ReadOnlySessionStoreStub<>(); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); + secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); + final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch(null, null)); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); + } + + @Test + public void shouldThrowNPEIfKeyIsNull() { + assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch(null)); } } diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 06640559cb4bb..61ea1ff0fd374 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -94,11 +94,13 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { + + NavigableMap, V>>> subSessionsMap = getSubSessionsMap(keyFrom, keyTo); + + if (subSessionsMap.isEmpty()) { return new KeyValueIteratorStub<>(Collections., V>>emptyIterator()); } - final Iterator, V>>> keysIterator = sessions.subMap(keyFrom, true, - keyTo, true).values().iterator(); + final Iterator, V>>> keysIterator = subSessionsMap.values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() { @@ -124,16 +126,33 @@ public KeyValue, V> next() { ); } + private NavigableMap, V>>> getSubSessionsMap(final K keyFrom, final K keyTo) { + final NavigableMap, V>>> subSessionsMap; + if (keyFrom == null && keyTo == null) { // fetch all + subSessionsMap = sessions; + } else if (keyFrom == null) { + subSessionsMap = sessions.headMap(keyTo, true); + } else if (keyTo == null) { + subSessionsMap = sessions.tailMap(keyFrom, true); + } else { + subSessionsMap = sessions.subMap(keyFrom, true, keyTo, true); + } + return subSessionsMap; + } + @Override public KeyValueIterator, V> backwardFetch(K keyFrom, K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { + + NavigableMap, V>>> subSessionsMap = getSubSessionsMap(keyFrom, keyTo); + + if (subSessionsMap.isEmpty()) { return new KeyValueIteratorStub<>(Collections.emptyIterator()); } - final Iterator, V>>> keysIterator = - sessions.subMap(keyFrom, true, keyTo, true).descendingMap().values().iterator(); + + final Iterator, V>>> keysIterator = subSessionsMap.descendingMap().values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() { From c9ef98d6b483f95977f31a2e2bb15b5ae19797c1 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 16 Sep 2021 17:16:19 +0800 Subject: [PATCH 2/2] revert --- .../streams/state/ReadOnlySessionStore.java | 17 +- .../AbstractRocksDBSegmentedBytesStore.java | 6 +- .../state/internals/CachingSessionStore.java | 30 ++- .../CompositeReadOnlySessionStore.java | 8 + .../state/internals/InMemorySessionStore.java | 60 +++-- .../state/internals/MeteredSessionStore.java | 16 +- .../state/internals/SegmentIterator.java | 2 +- .../AbstractSessionBytesStoreTest.java | 230 ++++++++--------- .../CachingInMemorySessionStoreTest.java | 130 ++-------- .../CachingPersistentSessionStoreTest.java | 128 ++-------- .../CompositeReadOnlySessionStoreTest.java | 45 +--- .../internals/SessionStoreFetchTest.java | 232 ++++++++++++++++++ .../kafka/test/ReadOnlySessionStoreStub.java | 31 +-- 13 files changed, 470 insertions(+), 465 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 7886c989b958c..0ade24286fdd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -126,15 +126,14 @@ default KeyValueIterator, AGG> backwardFindSessions(final K key, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range - * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range - * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration starts. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration ends. * @return iterator of sessions with the matching keys and aggregated values, from earliest to * latest session time. + * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, @@ -152,15 +151,14 @@ default KeyValueIterator, AGG> findSessions(final K keyFrom, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range - * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range - * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration starts. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration ends. * @return iterator of sessions with the matching keys and aggregated values, from earliest to * latest session time. + * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, @@ -178,15 +176,14 @@ default KeyValueIterator, AGG> findSessions(final K keyFrom, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range - * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range - * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration ends. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration starts. * @return backward iterator of sessions with the matching keys and aggregated values, from * latest to earliest session time. + * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, final K keyTo, @@ -204,15 +201,14 @@ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, * This iterator must be closed after use. * * @param keyFrom The first key that could be in the range - * A null value indicates a starting position from the first element in the store. * @param keyTo The last key that could be in the range - * A null value indicates that the range ends with the last element in the store. * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where * iteration ends. * @param latestSessionStartTime the end timestamp of the latest session to search for, where * iteration starts. * @return backward iterator of sessions with the matching keys and aggregated values, from * latest to earliest session time. + * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, final K keyTo, @@ -293,11 +289,10 @@ default KeyValueIterator, AGG> backwardFetch(final K key) { * available session to the newest/latest session. * * @param keyFrom first key in the range to find aggregated session values for - * A null value indicates a starting position from the first element in the store. * @param keyTo last key in the range to find aggregated session values for - * A null value indicates that the range ends with the last element in the store. * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest * session. + * @throws NullPointerException If null is used for any of the keys. */ KeyValueIterator, AGG> fetch(final K keyFrom, final K keyTo); @@ -309,9 +304,7 @@ default KeyValueIterator, AGG> backwardFetch(final K key) { * available session to the oldest/earliest session. * * @param keyFrom first key in the range to find aggregated session values for - * A null value indicates a starting position from the first element in the store. * @param keyTo last key in the range to find aggregated session values for - * A null value indicates that the range ends with the last element in the store. * @return backward KeyValueIterator containing all sessions for the provided key, from newest * to oldest session. * @throws NullPointerException If null is used for any of the keys. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index bfee6b2754a8f..f7aef116a91ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -114,7 +114,7 @@ KeyValueIterator fetch(final Bytes keyFrom, final long from, final long to, final boolean forward) { - if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { + if (keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -124,8 +124,8 @@ KeyValueIterator fetch(final Bytes keyFrom, final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); - final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); - final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); + final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); + final Bytes binaryTo = keySchema.upperRange(keyTo, to); return new SegmentIterator<>( searchSpace.iterator(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 9b07fe82e6d9d..1cfb8ce498a64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -47,10 +47,6 @@ class CachingSessionStore private final SessionKeySchema keySchema; private final SegmentedCacheFunction cacheFunction; - private static final String INVALID_RANGE_WARN_MSG = "Returning empty iterator for fetch with invalid key range: from > to. " + - "This may be due to range arguments set in the wrong order, " + - "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + - "Note that the built-in numerical serdes do not follow this for negative numbers"; private String cacheName; private InternalProcessorContext context; @@ -216,15 +212,18 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { - LOG.warn(INVALID_RANGE_WARN_MSG); + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } validateStoreOpen(); - final Bytes cacheKeyFrom = keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); - final Bytes cacheKeyTo = keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); final KeyValueIterator, byte[]> storeIterator = wrapped().findSessions( @@ -244,15 +243,18 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { - LOG.warn(INVALID_RANGE_WARN_MSG); + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } validateStoreOpen(); - final Bytes cacheKeyFrom = keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); - final Bytes cacheKeyTo = keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); final KeyValueIterator, byte[]> storeIterator = @@ -302,12 +304,16 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) @Override public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index 0f153cc4bf545..fb5fb61ae6108 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -101,6 +101,8 @@ public KeyValueIterator, V> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); final List> stores = storeProvider.stores(storeName, queryableStoreType); for (final ReadOnlySessionStore store : stores) { try { @@ -128,6 +130,8 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); final List> stores = storeProvider.stores(storeName, queryableStoreType); for (final ReadOnlySessionStore store : stores) { try { @@ -217,6 +221,8 @@ public KeyValueIterator, V> backwardFetch(final K key) { @Override public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom can't be null"); + Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>(storeName, @@ -227,6 +233,8 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { @Override public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom can't be null"); + Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 7785ee808395c..a14d3e11cefec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -54,12 +54,6 @@ public class InMemorySessionStore implements SessionStore { private final long retentionPeriod; - private final static String INVALID_RANGE_WARN_MSG = - "Returning empty iterator for fetch with invalid key range: from > to. " + - "This may be due to range arguments set in the wrong order, " + - "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + - "Note that the built-in numerical serdes do not follow this for negative numbers"; - private final ConcurrentNavigableMap>> endTimeMap = new ConcurrentSkipListMap<>(); private final Set openIterators = ConcurrentHashMap.newKeySet(); @@ -211,10 +205,16 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); + removeExpiredSegments(); - if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { - LOG.warn(INVALID_RANGE_WARN_MSG); + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } @@ -230,10 +230,16 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); + removeExpiredSegments(); - if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { - LOG.warn(INVALID_RANGE_WARN_MSG); + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } @@ -269,17 +275,24 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) @Override public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); + removeExpiredSegments(); - return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); + + return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true); } @Override public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); + removeExpiredSegments(); return registerNewIterator( - keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); + keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false); } @Override @@ -444,22 +457,17 @@ private void setAllIterators() { while (endTimeIterator.hasNext()) { final Entry>> nextEndTimeEntry = endTimeIterator.next(); currentEndTime = nextEndTimeEntry.getKey(); - - final ConcurrentNavigableMap> subKVMap; - if (keyFrom == null && keyTo == null) { - subKVMap = nextEndTimeEntry.getValue(); - } else if (keyFrom == null) { - subKVMap = nextEndTimeEntry.getValue().headMap(keyTo, true); - } else if (keyTo == null) { - subKVMap = nextEndTimeEntry.getValue().tailMap(keyFrom, true); - } else { - subKVMap = nextEndTimeEntry.getValue().subMap(keyFrom, true, keyTo, true); - } - if (forward) { - keyIterator = subKVMap.entrySet().iterator(); + keyIterator = nextEndTimeEntry.getValue() + .subMap(keyFrom, true, keyTo, true) + .entrySet() + .iterator(); } else { - keyIterator = subKVMap.descendingMap().entrySet().iterator(); + keyIterator = nextEndTimeEntry.getValue() + .subMap(keyFrom, true, keyTo, true) + .descendingMap() + .entrySet() + .iterator(); } if (setInnerIterators()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 529f6165ca7ab..f7178999b0fd5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -265,6 +265,8 @@ public KeyValueIterator, V> backwardFetch(final K key) { @Override public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, @@ -276,6 +278,8 @@ public KeyValueIterator, V> fetch(final K keyFrom, @Override public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, @@ -326,8 +330,10 @@ public KeyValueIterator, V> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - final Bytes bytesKeyFrom = keyFrom == null ? null : keyBytes(keyFrom); - final Bytes bytesKeyTo = keyTo == null ? null : keyBytes(keyTo); + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + final Bytes bytesKeyFrom = keyBytes(keyFrom); + final Bytes bytesKeyTo = keyBytes(keyTo); return new MeteredWindowedKeyValueIterator<>( wrapped().findSessions( bytesKeyFrom, @@ -345,8 +351,10 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - final Bytes bytesKeyFrom = keyFrom == null ? null : keyBytes(keyFrom); - final Bytes bytesKeyTo = keyTo == null ? null : keyBytes(keyTo); + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + final Bytes bytesKeyFrom = keyBytes(keyFrom); + final Bytes bytesKeyTo = keyBytes(keyTo); return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFindSessions( bytesKeyFrom, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index d84a1b958b14c..03b66a634f2db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -74,7 +74,7 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { - if (from == null && to == null) { + if (from == null || to == null) { if (forward) { currentIterator = currentSegment.all(); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 5f3ad61b1f95f..b2a1022f52862 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -47,15 +47,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.test.StreamsTestUtils.toSet; +import static org.apache.kafka.common.utils.Utils.toList; import static org.apache.kafka.test.StreamsTestUtils.valuesToSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; @@ -121,7 +121,7 @@ public void shouldPutAndFindSessionsInRange() { try (final KeyValueIterator, Long> values = sessionStore.findSessions(key, 0, 1000L) ) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } final List, Long>> expected2 = @@ -129,7 +129,7 @@ public void shouldPutAndFindSessionsInRange() { try (final KeyValueIterator, Long> values2 = sessionStore.findSessions(key, 400L, 600L) ) { - assertEquals(new HashSet<>(expected2), toSet(values2)); + assertEquals(expected2, toList(values2)); } } @@ -143,28 +143,29 @@ public void shouldPutAndBackwardFindSessionsInRange() { sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L); sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L); - final List, Long>> expected = - asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(a1, 1L)); + expected.add(KeyValue.pair(a2, 2L)); try (final KeyValueIterator, Long> values = sessionStore.backwardFindSessions(key, 0, 1000L)) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(toList(expected.descendingIterator()), toList(values)); } final List, Long>> expected2 = Collections.singletonList(KeyValue.pair(a2, 2L)); try (final KeyValueIterator, Long> values2 = sessionStore.backwardFindSessions(key, 400L, 600L)) { - assertEquals(new HashSet<>(expected2), toSet(values2)); + assertEquals(expected2, toList(values2)); } } @Test public void shouldFetchAllSessionsWithSameRecordKey() { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); @@ -174,18 +175,17 @@ public void shouldFetchAllSessionsWithSameRecordKey() { sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } } @Test public void shouldBackwardFetchAllSessionsWithSameRecordKey() { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L) - ); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); @@ -195,95 +195,59 @@ public void shouldBackwardFetchAllSessionsWithSameRecordKey() { sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(toList(expected.descendingIterator()), toList(values)); } } @Test public void shouldFetchAllSessionsWithinKeyRange() { - final List, Long>> expected = new ArrayList<>(); + final List, Long>> expected = new LinkedList<>(); expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L)); - expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L)); + expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); } - // add some that should only be fetched in unlimited fetch + // add some that shouldn't appear in the results sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); try (final KeyValueIterator, Long> values = sessionStore.fetch("aa", "bb")) { - assertEquals(new HashSet<>(expected), toSet(values)); - } - - // unlimited keyFrom fetch case - expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); - - try (final KeyValueIterator, Long> values = sessionStore.fetch(null, "bb")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } - // remove the one added for unlimited start fetch case - expected.remove(expected.size() - 1); - // unlimited keyTo fetch case - expected.add(KeyValue.pair(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L)); - - try (final KeyValueIterator, Long> values = sessionStore.fetch("aa", null)) { - assertEquals(new HashSet<>(expected), toSet(values)); - } - - // fetch all case - expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); - - try (final KeyValueIterator, Long> values = sessionStore.fetch(null, null)) { - assertEquals(new HashSet<>(expected), toSet(values)); + try (final KeyValueIterator, Long> values = sessionStore.findSessions("aa", "bb", 0L, Long.MAX_VALUE)) { + assertEquals(expected, toList(values)); } } @Test public void shouldBackwardFetchAllSessionsWithinKeyRange() { - final List, Long>> expected = new ArrayList<>(); + final LinkedList, Long>> expected = new LinkedList<>(); expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L)); - expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L)); + expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); } - // add some that should only be fetched in unlimited fetch + // add some that shouldn't appear in the results sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("aa", "bb")) { - assertEquals(new HashSet<>(expected), toSet(values)); - } - - // unlimited keyFrom fetch case - expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); - - try (final KeyValueIterator, Long> values = sessionStore.backwardFetch(null, "bb")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(toList(expected.descendingIterator()), toList(values)); } - // remove the one added for unlimited start fetch case - expected.remove(expected.size() - 1); - // unlimited keyTo fetch case - expected.add(KeyValue.pair(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L)); - - try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("aa", null)) { - assertEquals(new HashSet<>(expected), toSet(values)); - } - - // fetch all case - expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); - - try (final KeyValueIterator, Long> values = sessionStore.backwardFetch(null, null)) { - assertEquals(new HashSet<>(expected), toSet(values)); + try (final KeyValueIterator, Long> values = sessionStore.backwardFindSessions("aa", "bb", 0L, Long.MAX_VALUE)) { + assertEquals(toList(expected.descendingIterator()), toList(values)); } } @@ -315,7 +279,7 @@ public void shouldFindValuesWithinMergingSessionWindowRange() { KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); try (final KeyValueIterator, Long> results = sessionStore.findSessions(key, -1, 1000L)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(expected, toList(results)); } } @@ -325,13 +289,12 @@ public void shouldBackwardFindValuesWithinMergingSessionWindowRange() { sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L) - ); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L)); + expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); try (final KeyValueIterator, Long> results = sessionStore.backwardFindSessions(key, -1, 1000L)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(toList(expected.descendingIterator()), toList(results)); } } @@ -384,7 +347,7 @@ public void shouldFindSessionsToMerge() { Arrays.asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L)); try (final KeyValueIterator, Long> results = sessionStore.findSessions("a", 150, 300)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(expected, toList(results)); } } @@ -402,10 +365,10 @@ public void shouldBackwardFindSessionsToMerge() { sessionStore.put(session5, 5L); final List, Long>> expected = - asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L)); + asList(KeyValue.pair(session3, 3L), KeyValue.pair(session2, 2L)); try (final KeyValueIterator, Long> results = sessionStore.backwardFindSessions("a", 150, 300)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(expected, toList(results)); } } @@ -443,25 +406,7 @@ public void shouldFetchExactKeys() { try (final KeyValueIterator, Long> iterator = sessionStore.findSessions("a", "aa", 10, 0) ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); - } - - try (final KeyValueIterator, Long> iterator = - sessionStore.findSessions(null, "aa", 0, Long.MAX_VALUE) - ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); - } - - try (final KeyValueIterator, Long> iterator = - sessionStore.findSessions("a", null, 0, Long.MAX_VALUE) - ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); - } - - try (final KeyValueIterator, Long> iterator = - sessionStore.findSessions(null, null, 0, Long.MAX_VALUE) - ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L)))); } } @@ -499,25 +444,7 @@ public void shouldBackwardFetchExactKeys() { try (final KeyValueIterator, Long> iterator = sessionStore.backwardFindSessions("a", "aa", 10, 0) ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); - } - - try (final KeyValueIterator, Long> iterator = - sessionStore.backwardFindSessions(null, "aa", 0, Long.MAX_VALUE) - ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); - } - - try (final KeyValueIterator, Long> iterator = - sessionStore.backwardFindSessions("a", null, 0, Long.MAX_VALUE) - ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); - } - - try (final KeyValueIterator, Long> iterator = - sessionStore.backwardFindSessions(null, null, 0, Long.MAX_VALUE) - ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L)))); } } @@ -542,12 +469,20 @@ public void shouldFetchAndIterateOverExactBinaryKeys() { sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8"); sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9"); - final Set expectedKey1 = new HashSet<>(asList("1", "4", "7")); - assertThat(valuesToSet(sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1)); - final Set expectedKey2 = new HashSet<>(asList("2", "5", "8")); - assertThat(valuesToSet(sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2)); - final Set expectedKey3 = new HashSet<>(asList("3", "6", "9")); - assertThat(valuesToSet(sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3)); + final List expectedKey1 = asList("1", "4", "7"); + try (KeyValueIterator, String> iterator = sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1))); + } + + final List expectedKey2 = asList("2", "5", "8"); + try (KeyValueIterator, String> iterator = sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2))); + } + + final List expectedKey3 = asList("3", "6", "9"); + try (KeyValueIterator, String> iterator = sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3))); + } sessionStore.close(); } @@ -573,12 +508,21 @@ public void shouldBackwardFetchAndIterateOverExactBinaryKeys() { sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8"); sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9"); - final Set expectedKey1 = new HashSet<>(asList("1", "4", "7")); - assertThat(valuesToSet(sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1)); - final Set expectedKey2 = new HashSet<>(asList("2", "5", "8")); - assertThat(valuesToSet(sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2)); - final Set expectedKey3 = new HashSet<>(asList("3", "6", "9")); - assertThat(valuesToSet(sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3)); + + final List expectedKey1 = asList("7", "4", "1"); + try (KeyValueIterator, String> iterator = sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1))); + } + + final List expectedKey2 = asList("8", "5", "2"); + try (KeyValueIterator, String> iterator = sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2))); + } + + final List expectedKey3 = asList("9", "6", "3"); + try (KeyValueIterator, String> iterator = sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3))); + } sessionStore.close(); } @@ -629,13 +573,13 @@ public void shouldRestore() { } try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } sessionStore.close(); try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(Collections.emptySet(), toSet(values)); + assertEquals(Collections.emptyList(), toList(values)); } @@ -647,7 +591,7 @@ public void shouldRestore() { context.restore(sessionStore.name(), changeLog); try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } } @@ -748,6 +692,26 @@ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { assertThrows(NullPointerException.class, () -> sessionStore.findSessions(null, 1L, 2L)); } + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { + assertThrows(NullPointerException.class, () -> sessionStore.findSessions(null, "anyKeyTo", 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { + assertThrows(NullPointerException.class, () -> sessionStore.findSessions("anyKeyFrom", null, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { + assertThrows(NullPointerException.class, () -> sessionStore.fetch(null, "anyToKey")); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullToKey() { + assertThrows(NullPointerException.class, () -> sessionStore.fetch("anyFromKey", null)); + } + @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> sessionStore.fetch(null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index 6dbdef836305b..f8cecb56d49f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -150,31 +150,7 @@ public void shouldPutFetchAllKeysFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> all = cachingStore.fetch(keyA, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom fetch - try (final KeyValueIterator, byte[]> all = cachingStore.fetch(null, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyTo fetch - try (final KeyValueIterator, byte[]> all = cachingStore.fetch(null, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom and keyTo fetch - try (final KeyValueIterator, byte[]> all = cachingStore.fetch(null, keyB)) { + try (final KeyValueIterator, byte[]> all = cachingStore.findSessions(keyA, keyB, 0, 0)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); @@ -190,31 +166,7 @@ public void shouldPutBackwardFetchAllKeysFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(keyA, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom fetch - try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(null, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyTo fetch - try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(null, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom and keyTo fetch - try (final KeyValueIterator, byte[]> all = cachingStore.backwardFetch(null, null)) { + try (final KeyValueIterator, byte[]> all = cachingStore.backwardFindSessions(keyA, keyB, 0, 0)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); @@ -289,33 +241,7 @@ public void shouldPutFetchRangeFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> some = - cachingStore.findSessions(keyAA, keyB, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyFrom case - try (final KeyValueIterator, byte[]> some = - cachingStore.findSessions(null, keyAA, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.findSessions(keyAA, keyB, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyFrom and keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.findSessions(null, null, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + try (final KeyValueIterator, byte[]> some = cachingStore.findSessions(keyAA, keyB, 0, 0)) { verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); @@ -330,35 +256,9 @@ public void shouldPutBackwardFetchRangeFromCache() { assertEquals(3, cache.size()); - try (final KeyValueIterator, byte[]> some = - cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyFrom case - try (final KeyValueIterator, byte[]> some = - cachingStore.backwardFindSessions(null, keyAA, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyFrom and keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.backwardFindSessions(null, null, 0, 0)) { + try (final KeyValueIterator, byte[]> some = cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); } } @@ -434,7 +334,7 @@ public void shouldRemove() { cachingStore.remove(a); try (final KeyValueIterator, byte[]> rangeIter = - cachingStore.findSessions(keyA, 0, 0)) { + cachingStore.findSessions(keyA, 0, 0)) { assertFalse(rangeIter.hasNext()); assertNull(cachingStore.fetchSession(keyA, 0, 0)); @@ -746,6 +646,26 @@ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); } + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullToKey() { + assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); + } + @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index cbf4331c39747..9af329df538bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -133,34 +133,7 @@ public void shouldPutFetchAllKeysFromCache() { assertEquals(3, cache.size()); try (final KeyValueIterator, byte[]> all = - cachingStore.fetch(keyA, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom fetch - try (final KeyValueIterator, byte[]> all = - cachingStore.fetch(null, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyTo fetch - try (final KeyValueIterator, byte[]> all = - cachingStore.fetch(keyA, null)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom and keyTo fetch - try (final KeyValueIterator, byte[]> all = - cachingStore.fetch(null, null)) { + cachingStore.findSessions(keyA, keyB, 0, 0)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); @@ -177,34 +150,7 @@ public void shouldPutBackwardFetchAllKeysFromCache() { assertEquals(3, cache.size()); try (final KeyValueIterator, byte[]> all = - cachingStore.backwardFetch(keyA, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom fetch - try (final KeyValueIterator, byte[]> all = - cachingStore.backwardFetch(null, keyB)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyTo fetch - try (final KeyValueIterator, byte[]> all = - cachingStore.backwardFetch(keyA, null)) { - verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(all.hasNext()); - } - - // infinite keyFrom and keyTo fetch - try (final KeyValueIterator, byte[]> all = - cachingStore.backwardFetch(null, null)) { + cachingStore.backwardFindSessions(keyA, keyB, 0, 0)) { verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); @@ -287,31 +233,6 @@ public void shouldPutFetchRangeFromCache() { verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); } - - // infinite keyFrom case - try (final KeyValueIterator, byte[]> some = - cachingStore.findSessions(null, keyAA, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.findSessions(keyAA, keyB, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyFrom and keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.findSessions(null, null, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } } @Test @@ -328,31 +249,6 @@ public void shouldPutBackwardFetchRangeFromCache() { verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); assertFalse(some.hasNext()); } - - // infinite keyFrom case - try (final KeyValueIterator, byte[]> some = - cachingStore.backwardFindSessions(null, keyAA, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.backwardFindSessions(keyAA, keyB, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } - - // infinite keyFrom and keyTo case - try (final KeyValueIterator, byte[]> some = - cachingStore.backwardFindSessions(null, null, 0, 0)) { - verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); - verifyWindowedKeyValue(some.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); - assertFalse(some.hasNext()); - } } @Test @@ -757,6 +653,26 @@ public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); } + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullToKey() { + assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); + } + @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java index c2d38def4a91d..66ae2431c5938 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java @@ -146,52 +146,21 @@ public void shouldFetchKeyRangeAcrossStores() { underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch("a", "b")); - assertThat(results, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); + assertThat(results.size(), equalTo(2)); } @Test - public void shouldFetchKeyRangeAcrossStoresWithNullKeyFrom() { - final ReadOnlySessionStoreStub secondUnderlying = new - ReadOnlySessionStoreStub<>(); - stubProviderTwo.addStore(storeName, secondUnderlying); - underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); - secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); - final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch(null, "b")); - assertThat(results, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); - } - - @Test - public void shouldFetchKeyRangeAcrossStoresWithNullKeyTo() { - final ReadOnlySessionStoreStub secondUnderlying = new - ReadOnlySessionStoreStub<>(); - stubProviderTwo.addStore(storeName, secondUnderlying); - underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); - secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); - final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch("a", null)); - assertThat(results, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); + public void shouldThrowNPEIfKeyIsNull() { + assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch(null)); } @Test - public void shouldFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() { - final ReadOnlySessionStoreStub secondUnderlying = new - ReadOnlySessionStoreStub<>(); - stubProviderTwo.addStore(storeName, secondUnderlying); - underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 0L); - secondUnderlying.put(new Windowed<>("b", new SessionWindow(0, 0)), 10L); - final List, Long>> results = StreamsTestUtils.toList(sessionStore.fetch(null, null)); - assertThat(results, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 0L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 10L)))); + public void shouldThrowNPEIfFromKeyIsNull() { + assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch(null, "a")); } @Test - public void shouldThrowNPEIfKeyIsNull() { - assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch(null)); + public void shouldThrowNPEIfToKeyIsNull() { + assertThrows(NullPointerException.class, () -> underlyingSessionStore.fetch("a", null)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java new file mode 100644 index 0000000000000..1e274a68a3af6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +@RunWith(Parameterized.class) +public class SessionStoreFetchTest { + private enum StoreType { InMemory, RocksDB }; + private static final String STORE_NAME = "store"; + private static final int DATA_SIZE = 5; + private static final long WINDOW_SIZE = 500L; + private static final long RETENTION_MS = 10000L; + + private StoreType storeType; + private boolean enableLogging; + private boolean enableCaching; + private boolean forward; + + private LinkedList, Long>> expectedRecords; + private LinkedList> records; + private Properties streamsConfig; + + public SessionStoreFetchTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { + this.storeType = storeType; + this.enableLogging = enableLogging; + this.enableCaching = enableCaching; + this.forward = forward; + + this.records = new LinkedList<>(); + this.expectedRecords = new LinkedList<>(); + final int m = DATA_SIZE / 2; + for (int i = 0; i < DATA_SIZE; i++) { + final String keyStr = i < m ? "a" : "b"; + final String key = "key-" + keyStr; + final String key2 = "key-" + keyStr + keyStr; + final String value = "val-" + i; + final KeyValue r = new KeyValue<>(key, value); + final KeyValue r2 = new KeyValue<>(key2, value); + records.add(r); + records.add(r2); + } + expectedRecords.add(new KeyValue<>(new Windowed<>("key-a", new SessionWindow(0, 500)), 4L)); + expectedRecords.add(new KeyValue<>(new Windowed<>("key-aa", new SessionWindow(0, 500)), 4L)); + expectedRecords.add(new KeyValue<>(new Windowed<>("key-b", new SessionWindow(1500, 2000)), 6L)); + expectedRecords.add(new KeyValue<>(new Windowed<>("key-bb", new SessionWindow(1500, 2000)), 6L)); + } + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") + public static Collection data() { + final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB); + final List logging = Arrays.asList(true, false); + final List caching = Arrays.asList(true, false); + final List forward = Arrays.asList(true, false); + return buildParameters(types, logging, caching, forward); + } + + @Before + public void setup() { + streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) + )); + } + + @Test + public void testStoreConfig() { + final Materialized> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching); + //Create topology: table from input topic + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + stream. + groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(WINDOW_SIZE))) + .count(stateStoreConfig) + .toStream() + .to("output"); + + final Topology topology = builder.build(); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) { + //get input topic and stateStore + final TestInputTopic input = driver + .createInputTopic("input", new StringSerializer(), new StringSerializer()); + final SessionStore stateStore = driver.getSessionStore(STORE_NAME); + + //write some data + final int medium = DATA_SIZE / 2 * 2; + for (int i = 0; i < records.size(); i++) { + final KeyValue kv = records.get(i); + final long windowStartTime = i < medium ? 0 : 1500; + input.pipeInput(kv.key, kv.value, windowStartTime); + input.pipeInput(kv.key, kv.value, windowStartTime + WINDOW_SIZE); + } + + // query the state store + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetch("key-a", "key-bb") : + stateStore.backwardFetch("key-a", "key-bb")) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.findSessions("key-a", "key-bb", 0L, Long.MAX_VALUE) : + stateStore.backwardFindSessions("key-a", "key-bb", 0L, Long.MAX_VALUE)) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + } + } + + private static Collection buildParameters(final List... argOptions) { + List result = new LinkedList<>(); + result.add(new Object[0]); + + for (final List argOption : argOptions) { + result = times(result, argOption); + } + + return result; + } + + private static List times(final List left, final List right) { + final List result = new LinkedList<>(); + for (final Object[] args : left) { + for (final Object rightElem : right) { + final Object[] resArgs = new Object[args.length + 1]; + System.arraycopy(args, 0, resArgs, 0, args.length); + resArgs[args.length] = rightElem; + result.add(resArgs); + } + } + return result; + } + + private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { + final Supplier createStore = () -> { + if (type == StoreType.InMemory) { + return Stores.inMemorySessionStore(STORE_NAME, Duration.ofMillis(RETENTION_MS)); + } else if (type == StoreType.RocksDB) { + return Stores.persistentSessionStore(STORE_NAME, Duration.ofMillis(RETENTION_MS)); + } else { + return Stores.inMemorySessionStore(STORE_NAME, Duration.ofMillis(RETENTION_MS)); + } + }; + + final SessionBytesStoreSupplier stateStoreSupplier = createStore.get(); + final Materialized> stateStoreConfig = Materialized + .as(stateStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long()); + if (cachingEnabled) { + stateStoreConfig.withCachingEnabled(); + } else { + stateStoreConfig.withCachingDisabled(); + } + if (loggingEnabled) { + stateStoreConfig.withLoggingEnabled(new HashMap()); + } else { + stateStoreConfig.withLoggingDisabled(); + } + return stateStoreConfig; + } +} diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 61ea1ff0fd374..06640559cb4bb 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -94,13 +94,11 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - - NavigableMap, V>>> subSessionsMap = getSubSessionsMap(keyFrom, keyTo); - - if (subSessionsMap.isEmpty()) { + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { return new KeyValueIteratorStub<>(Collections., V>>emptyIterator()); } - final Iterator, V>>> keysIterator = subSessionsMap.values().iterator(); + final Iterator, V>>> keysIterator = sessions.subMap(keyFrom, true, + keyTo, true).values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() { @@ -126,33 +124,16 @@ public KeyValue, V> next() { ); } - private NavigableMap, V>>> getSubSessionsMap(final K keyFrom, final K keyTo) { - final NavigableMap, V>>> subSessionsMap; - if (keyFrom == null && keyTo == null) { // fetch all - subSessionsMap = sessions; - } else if (keyFrom == null) { - subSessionsMap = sessions.headMap(keyTo, true); - } else if (keyTo == null) { - subSessionsMap = sessions.tailMap(keyFrom, true); - } else { - subSessionsMap = sessions.subMap(keyFrom, true, keyTo, true); - } - return subSessionsMap; - } - @Override public KeyValueIterator, V> backwardFetch(K keyFrom, K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - - NavigableMap, V>>> subSessionsMap = getSubSessionsMap(keyFrom, keyTo); - - if (subSessionsMap.isEmpty()) { + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { return new KeyValueIteratorStub<>(Collections.emptyIterator()); } - - final Iterator, V>>> keysIterator = subSessionsMap.descendingMap().values().iterator(); + final Iterator, V>>> keysIterator = + sessions.subMap(keyFrom, true, keyTo, true).descendingMap().values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() {