diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 230d2576178da..8874908d18072 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -28,31 +28,153 @@ * @param the aggregated value type */ public interface ReadOnlySessionStore { + /** - * Retrieve all aggregated sessions for the provided key. + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from earliest to latest. + *

+ * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. + * @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator, AGG> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from latest to earliest. + *

+ * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. + * @return backward iterator of sessions with the matching key and aggregated values, from latest to earliest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator, AGG> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from earliest to latest. + *

+ * This iterator must be closed after use. + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. + * @return iterator of sessions with the matching keys and aggregated values, from earliest to latest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime iterating from latest to earliest. + *

* This iterator must be closed after use. * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. + * @return backward iterator of sessions with the matching keys and aggregated values, from latest to earliest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Get the value of key from a single session. + * + * @param key the key to fetch + * @param startTime start timestamp of the session + * @param endTime end timestamp of the session + * @return The value or {@code null} if no session associated with the key can be found + * @throws NullPointerException If {@code null} is used for any key. + */ + default AGG fetchSession(final K key, final long startTime, final long endTime) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Retrieve all aggregated sessions for the provided key. + * This iterator must be closed after use. + *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * * @param key record key to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key. + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. * @throws NullPointerException If null is used for key. * */ KeyValueIterator, AGG> fetch(final K key); /** - * Retrieve all aggregated sessions for the given range of keys. + * Retrieve all aggregated sessions for the provided key. * This iterator must be closed after use. + *

+ * For each key, the iterator guarantees ordering of sessions, starting from the newest/latest + * available session to the oldest/earliest session. * + * @param key record key to find aggregated session values for + * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator, AGG> backwardFetch(final K key) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Retrieve all aggregated sessions for the given range of keys. + * This iterator must be closed after use. + *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * * @param from first key in the range to find aggregated session values for * @param to last key in the range to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key. + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. * @throws NullPointerException If null is used for any of the keys. */ KeyValueIterator, AGG> fetch(final K from, final K to); + + /** + * Retrieve all aggregated sessions for the given range of keys. + * This iterator must be closed after use. + *

+ * For each key, the iterator guarantees ordering of sessions, starting from the newest/latest + * available session to the oldest/earliest session. + * + * @param from first key in the range to find aggregated session values for + * @param to last key in the range to find aggregated session values for + * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @throws NullPointerException If null is used for any of the keys. + */ + default KeyValueIterator, AGG> backwardFetch(final K from, final K to) { + throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index faaa751489af4..47f48d5e8143c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -34,46 +34,6 @@ */ public interface SessionStore extends StateStore, ReadOnlySessionStore { - /** - * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime - * - * This iterator must be closed after use. - * - * @param key the key to return sessions for - * @param earliestSessionEndTime the end timestamp of the earliest session to search for - * @param latestSessionStartTime the end timestamp of the latest session to search for - * @return iterator of sessions with the matching key and aggregated values - * @throws NullPointerException If null is used for key. - */ - KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); - - /** - * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime - * - * This iterator must be closed after use. - * - * @param keyFrom The first key that could be in the range - * @param keyTo The last key that could be in the range - * @param earliestSessionEndTime the end timestamp of the earliest session to search for - * @param latestSessionStartTime the end timestamp of the latest session to search for - * @return iterator of sessions with the matching keys and aggregated values - * @throws NullPointerException If null is used for any key. - */ - KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); - - /** - * Get the value of key from a single session. - * - * @param key the key to fetch - * @param startTime start timestamp of the session - * @param endTime end timestamp of the session - * @return The value or {@code null} if no session associated with the key can be found - * @throws NullPointerException If {@code null} is used for any key. - */ - AGG fetchSession(final K key, final long startTime, final long endTime); - /** * Remove the session aggregated with provided {@link Windowed} key from the store * @param sessionKey key of the session to remove diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index c92123dbe757c..d0fe25a6050ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -76,7 +76,6 @@ public void init(final StateStoreContext context, final StateStore root) { super.init(context, root); } - @SuppressWarnings("unchecked") private void initInternal(final InternalProcessorContext context) { this.context = context; @@ -159,7 +158,7 @@ public KeyValueIterator, byte[]> findSessions(final Bytes key, validateStoreOpen(); final PeekingKeyValueIterator cacheIterator = wrapped().persistent() ? - new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime) : + new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime, true) : context.cache().range(cacheName, cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime)) @@ -174,7 +173,38 @@ public KeyValueIterator, byte[]> findSessions(final Bytes key, latestSessionStartTime); final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); - return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, true); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + validateStoreOpen(); + + final PeekingKeyValueIterator cacheIterator = wrapped().persistent() ? + new CacheIteratorWrapper(key, earliestSessionEndTime, latestSessionStartTime, false) : + context.cache().reverseRange( + cacheName, + cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), + cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime) + ) + ); + + final KeyValueIterator, byte[]> storeIterator = wrapped().backwardFindSessions( + key, + earliestSessionEndTime, + latestSessionStartTime + ); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition( + key, + key, + earliestSessionEndTime, + latestSessionStartTime + ); + final PeekingKeyValueIterator filteredCacheIterator = + new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, false); } @Override @@ -205,7 +235,39 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro latestSessionStartTime); final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); - return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, true); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + validateStoreOpen(); + + final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); + final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); + + final KeyValueIterator, byte[]> storeIterator = + wrapped().backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + final HasNextCondition hasNextCondition = keySchema.hasNextCondition( + keyFrom, + keyTo, + earliestSessionEndTime, + latestSessionStartTime + ); + final PeekingKeyValueIterator filteredCacheIterator = + new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); + return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction, false); } @Override @@ -232,6 +294,12 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { return findSessions(key, 0, Long.MAX_VALUE); } + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes key) { + Objects.requireNonNull(key, "key cannot be null"); + return backwardFindSessions(key, 0, Long.MAX_VALUE); + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { @@ -240,6 +308,14 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, return findSessions(from, to, 0, Long.MAX_VALUE); } + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes from, + final Bytes to) { + Objects.requireNonNull(from, "from cannot be null"); + Objects.requireNonNull(to, "to cannot be null"); + return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + } + public void flush() { context.cache().flush(cacheName); wrapped().flush(); @@ -269,6 +345,8 @@ private class CacheIteratorWrapper implements PeekingKeyValueIterator lastSegmentId) { - current = null; - return; - } + if (currentSegmentId > lastSegmentId) { + current = null; + return; + } - setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); + + current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); + } else { + --currentSegmentId; + + if (currentSegmentId < lastSegmentId) { + current = null; + return; + } + + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); + + current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); + } - current.close(); - current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); } private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) { @@ -376,7 +482,7 @@ private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRang throw new IllegalStateException("Error iterating over segments: segment interval has changed"); } - if (keyFrom == keyTo) { + if (keyFrom.equals(keyTo)) { cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 0d2133d9f40e5..556a67e60ee90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -59,11 +59,25 @@ public KeyValueIterator, byte[]> findSessions(final Bytes key, f return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); } + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + @Override public KeyValueIterator, byte[]> findSessions(final Bytes keyFrom, final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); } + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes keyFrom, final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + } + @Override public void remove(final Windowed sessionKey) { wrapped().remove(sessionKey); @@ -81,11 +95,21 @@ public byte[] fetchSession(final Bytes key, final long startTime, final long end return wrapped().fetchSession(key, startTime, endTime); } + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes key) { + return wrapped().backwardFetch(key); + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes key) { return wrapped().fetch(key); } + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { + return wrapped().backwardFetch(from, to); + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { return wrapped().fetch(from, to); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index 63d551c1d089e..72233122a6286 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -42,6 +42,137 @@ public CompositeReadOnlySessionStore(final StateStoreProvider storeProvider, this.storeName = storeName; } + @Override + public KeyValueIterator, V> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + final KeyValueIterator, V> result = + store.findSessions(key, earliestSessionEndTime, latestSessionStartTime); + + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + final KeyValueIterator, V> result = store.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public KeyValueIterator, V> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + final KeyValueIterator, V> result = + store.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + final KeyValueIterator, V> result = + store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public V fetchSession(final K key, final long startTime, final long endTime) { + Objects.requireNonNull(key, "key can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + return store.fetchSession(key, startTime, endTime); + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return null; + } + @Override public KeyValueIterator, V> fetch(final K key) { Objects.requireNonNull(key, "key can't be null"); @@ -64,6 +195,30 @@ public KeyValueIterator, V> fetch(final K key) { return KeyValueIterators.emptyIterator(); } + @Override + public KeyValueIterator, V> backwardFetch(final K key) { + Objects.requireNonNull(key, "key can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + final KeyValueIterator, V> result = store.backwardFetch(key); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException( + "State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata.", + ise + ); + } + } + return KeyValueIterators.emptyIterator(); + } + @Override public KeyValueIterator, V> fetch(final K from, final K to) { Objects.requireNonNull(from, "from can't be null"); @@ -74,4 +229,18 @@ public KeyValueIterator, V> fetch(final K from, final K to) { storeProvider.stores(storeName, queryableStoreType).iterator(), nextIteratorFunction)); } + + @Override + public KeyValueIterator, V> backwardFetch(final K from, final K to) { + Objects.requireNonNull(from, "from can't be null"); + Objects.requireNonNull(to, "to can't be null"); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch(from, to); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>( + storeProvider.stores(storeName, queryableStoreType).iterator(), + nextIteratorFunction + ) + ); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 2e45b48b7df1d..46c4de2f04873 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -168,7 +168,25 @@ public KeyValueIterator, byte[]> findSessions(final Bytes key, return registerNewIterator(key, key, latestSessionStartTime, - endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator()); + endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator(), + true); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key cannot be null"); + + removeExpiredSegments(); + + return registerNewIterator( + key, + key, + latestSessionStartTime, + endTimeMap.tailMap(earliestSessionEndTime, true).descendingMap().entrySet().iterator(), + false + ); } @Override @@ -192,7 +210,35 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro return registerNewIterator(keyFrom, keyTo, latestSessionStartTime, - endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator()); + endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator(), + true); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); + + removeExpiredSegments(); + + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + return registerNewIterator( + keyFrom, + keyTo, + latestSessionStartTime, + endTimeMap.tailMap(earliestSessionEndTime, true).descendingMap().entrySet().iterator(), + false + ); } @Override @@ -202,7 +248,17 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { removeExpiredSegments(); - return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); + return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true); + } + + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes key) { + + Objects.requireNonNull(key, "key cannot be null"); + + removeExpiredSegments(); + + return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false); } @Override @@ -214,7 +270,17 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final B removeExpiredSegments(); - return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); + return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); + } + + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { + Objects.requireNonNull(from, "from key cannot be null"); + Objects.requireNonNull(to, "to key cannot be null"); + + removeExpiredSegments(); + + return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); } @Override @@ -259,8 +325,17 @@ private void removeExpiredSegments() { private InMemorySessionStoreIterator registerNewIterator(final Bytes keyFrom, final Bytes keyTo, final long latestSessionStartTime, - final Iterator>>> endTimeIterator) { - final InMemorySessionStoreIterator iterator = new InMemorySessionStoreIterator(keyFrom, keyTo, latestSessionStartTime, endTimeIterator, openIterators::remove); + final Iterator>>> endTimeIterator, + final boolean forward) { + final InMemorySessionStoreIterator iterator = + new InMemorySessionStoreIterator( + keyFrom, + keyTo, + latestSessionStartTime, + endTimeIterator, + openIterators::remove, + forward + ); openIterators.add(iterator); return iterator; } @@ -285,17 +360,21 @@ private static class InMemorySessionStoreIterator implements KeyValueIterator>>> endTimeIterator, - final ClosingCallback callback) { + final ClosingCallback callback, + final boolean forward) { this.keyFrom = keyFrom; this.keyTo = keyTo; this.latestSessionStartTime = latestSessionStartTime; this.endTimeIterator = endTimeIterator; this.callback = callback; + this.forward = forward; setAllIterators(); } @@ -366,7 +445,18 @@ private void setAllIterators() { while (endTimeIterator.hasNext()) { final Entry>> nextEndTimeEntry = endTimeIterator.next(); currentEndTime = nextEndTimeEntry.getKey(); - keyIterator = nextEndTimeEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); + if (forward) { + keyIterator = nextEndTimeEntry.getValue() + .subMap(keyFrom, true, keyTo, true) + .entrySet() + .iterator(); + } else { + keyIterator = nextEndTimeEntry.getValue() + .subMap(keyFrom, true, keyTo, true) + .descendingMap() + .entrySet() + .iterator(); + } if (setInnerIterators()) { return; @@ -383,9 +473,22 @@ private boolean setInnerIterators() { currentKey = nextKeyEntry.getKey(); if (latestSessionStartTime == Long.MAX_VALUE) { - recordIterator = nextKeyEntry.getValue().entrySet().iterator(); + if (forward) { + recordIterator = nextKeyEntry.getValue().descendingMap().entrySet().iterator(); + } else { + recordIterator = nextKeyEntry.getValue().entrySet().iterator(); + } } else { - recordIterator = nextKeyEntry.getValue().headMap(latestSessionStartTime, true).entrySet().iterator(); + if (forward) { + recordIterator = nextKeyEntry.getValue() + .headMap(latestSessionStartTime, true) + .descendingMap() + .entrySet().iterator(); + } else { + recordIterator = nextKeyEntry.getValue() + .headMap(latestSessionStartTime, true) + .entrySet().iterator(); + } } if (recordIterator.hasNext()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index ff45a418889a9..cd0c0df2c2f57 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -32,8 +32,9 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator cacheIterator, final KeyValueIterator, byte[]> storeIterator, - final SegmentedCacheFunction cacheFunction) { - super(cacheIterator, storeIterator, true); + final SegmentedCacheFunction cacheFunction, + final boolean forward) { + super(cacheIterator, storeIterator, forward); this.cacheFunction = cacheFunction; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 8b9256dc5d91b..f7f25c054f326 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -221,6 +221,18 @@ public KeyValueIterator, V> fetch(final K key) { time); } + @Override + public KeyValueIterator, V> backwardFetch(final K key) { + Objects.requireNonNull(key, "key cannot be null"); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFetch(keyBytes(key)), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + @Override public KeyValueIterator, V> fetch(final K from, final K to) { @@ -234,6 +246,20 @@ public KeyValueIterator, V> fetch(final K from, time); } + @Override + public KeyValueIterator, V> backwardFetch(final K from, + final K to) { + Objects.requireNonNull(from, "from cannot be null"); + Objects.requireNonNull(to, "to cannot be null"); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFetch(keyBytes(from), keyBytes(to)), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + @Override public KeyValueIterator, V> findSessions(final K key, final long earliestSessionEndTime, @@ -251,6 +277,25 @@ public KeyValueIterator, V> findSessions(final K key, time); } + @Override + public KeyValueIterator, V> backwardFindSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key cannot be null"); + final Bytes bytesKey = keyBytes(key); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFindSessions( + bytesKey, + earliestSessionEndTime, + latestSessionStartTime + ), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + @Override public KeyValueIterator, V> findSessions(final K keyFrom, final K keyTo, @@ -272,6 +317,29 @@ public KeyValueIterator, V> findSessions(final K keyFrom, time); } + @Override + public KeyValueIterator, V> backwardFindSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + final Bytes bytesKeyFrom = keyBytes(keyFrom); + final Bytes bytesKeyTo = keyBytes(keyTo); + return new MeteredWindowedKeyValueIterator<>( + wrapped().backwardFindSessions( + bytesKeyFrom, + bytesKeyTo, + earliestSessionEndTime, + latestSessionStartTime + ), + fetchSensor, + streamsMetrics, + serdes, + time + ); + } + @Override public void flush() { maybeMeasureLatency(super::flush, time, flushSensor); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 2f7a211d5266a..338769abea4a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -42,6 +42,18 @@ public KeyValueIterator, byte[]> findSessions(final Bytes key, return new WrappedSessionStoreIterator(bytesIterator); } + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator bytesIterator = wrapped().backwardFetch( + key, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator); + } + @Override public KeyValueIterator, byte[]> findSessions(final Bytes keyFrom, final Bytes keyTo, @@ -56,6 +68,20 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro return new WrappedSessionStoreIterator(bytesIterator); } + @Override + public KeyValueIterator, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + final KeyValueIterator bytesIterator = wrapped().backwardFetch( + keyFrom, + keyTo, + earliestSessionEndTime, + latestSessionStartTime + ); + return new WrappedSessionStoreIterator(bytesIterator); + } + @Override public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { return wrapped().get(SessionKeySchema.toBinary(key, startTime, endTime)); @@ -66,11 +92,21 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { return findSessions(key, 0, Long.MAX_VALUE); } + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes key) { + return backwardFindSessions(key, 0, Long.MAX_VALUE); + } + @Override public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { return findSessions(from, to, 0, Long.MAX_VALUE); } + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { + return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + } + @Override public void remove(final Windowed key) { wrapped().remove(SessionKeySchema.toBinary(key)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index bb425a9f8e8d6..b355f0e34dacd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -134,6 +134,31 @@ public void shouldPutAndFindSessionsInRange() { } } + @Test + public void shouldPutAndBackwardFindSessionsInRange() { + final String key = "a"; + final Windowed a1 = new Windowed<>(key, new SessionWindow(10, 10L)); + final Windowed a2 = new Windowed<>(key, new SessionWindow(500L, 1000L)); + sessionStore.put(a1, 1L); + sessionStore.put(a2, 2L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L); + + final List, Long>> expected = + asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); + + try (final KeyValueIterator, Long> values = sessionStore.backwardFindSessions(key, 0, 1000L)) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + + final List, Long>> expected2 = + Collections.singletonList(KeyValue.pair(a2, 2L)); + + try (final KeyValueIterator, Long> values2 = sessionStore.backwardFindSessions(key, 400L, 600L)) { + assertEquals(new HashSet<>(expected2), toSet(values2)); + } + } + @Test public void shouldFetchAllSessionsWithSameRecordKey() { final List, Long>> expected = Arrays.asList( @@ -154,6 +179,27 @@ public void shouldFetchAllSessionsWithSameRecordKey() { } } + @Test + public void shouldBackwardFetchAllSessionsWithSameRecordKey() { + final List, Long>> expected = asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L) + ); + + for (final KeyValue, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); + + try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("a")) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + } + @Test public void shouldFetchAllSessionsWithinKeyRange() { final List, Long>> expected = Arrays.asList( @@ -176,6 +222,29 @@ public void shouldFetchAllSessionsWithinKeyRange() { } } + @Test + public void shouldBackwardFetchAllSessionsWithinKeyRange() { + final List, Long>> expected = asList( + KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), + + KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L) + ); + + for (final KeyValue, Long> kv : expected) { + sessionStore.put(kv.key, kv.value); + } + + // add some that shouldn't appear in the results + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); + + try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("aa", "bb")) { + assertEquals(new HashSet<>(expected), toSet(values)); + } + } + @Test public void shouldFetchExactSession() { sessionStore.put(new Windowed<>("a", new SessionWindow(0, 4)), 1L); @@ -208,6 +277,22 @@ public void shouldFindValuesWithinMergingSessionWindowRange() { } } + @Test + public void shouldBackwardFindValuesWithinMergingSessionWindowRange() { + final String key = "a"; + sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); + + final List, Long>> expected = asList( + KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L) + ); + + try (final KeyValueIterator, Long> results = sessionStore.backwardFindSessions(key, -1, 1000L)) { + assertEquals(new HashSet<>(expected), toSet(results)); + } + } + @Test public void shouldRemove() { sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L); @@ -261,6 +346,27 @@ public void shouldFindSessionsToMerge() { } } + @Test + public void shouldBackwardFindSessionsToMerge() { + final Windowed session1 = new Windowed<>("a", new SessionWindow(0, 100)); + final Windowed session2 = new Windowed<>("a", new SessionWindow(101, 200)); + final Windowed session3 = new Windowed<>("a", new SessionWindow(201, 300)); + final Windowed session4 = new Windowed<>("a", new SessionWindow(301, 400)); + final Windowed session5 = new Windowed<>("a", new SessionWindow(401, 500)); + sessionStore.put(session1, 1L); + sessionStore.put(session2, 2L); + sessionStore.put(session3, 3L); + sessionStore.put(session4, 4L); + sessionStore.put(session5, 5L); + + final List, Long>> expected = + asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L)); + + try (final KeyValueIterator, Long> results = sessionStore.backwardFindSessions("a", 150, 300)) { + assertEquals(new HashSet<>(expected), toSet(results)); + } + } + @Test public void shouldFetchExactKeys() { sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long()); @@ -298,6 +404,43 @@ public void shouldFetchExactKeys() { } } + @Test + public void shouldBackwardFetchExactKeys() { + sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long()); + sessionStore.init((StateStoreContext) context, sessionStore); + + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); + sessionStore.put(new Windowed<>("a", + new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); + + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions("a", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 3L, 5L)))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions("aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L, 4L)))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions("a", "aa", 0, Long.MAX_VALUE) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); + } + + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions("a", "aa", 10, 0) + ) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); + } + } + @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final SessionStore sessionStore = @@ -327,6 +470,35 @@ public void shouldFetchAndIterateOverExactBinaryKeys() { assertThat(valuesToSet(sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3)); } + @Test + public void shouldBackwardFetchAndIterateOverExactBinaryKeys() { + final SessionStore sessionStore = + buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String()); + + sessionStore.init((StateStoreContext) context, sessionStore); + + final Bytes key1 = Bytes.wrap(new byte[] {0}); + final Bytes key2 = Bytes.wrap(new byte[] {0, 0}); + final Bytes key3 = Bytes.wrap(new byte[] {0, 0, 0}); + + sessionStore.put(new Windowed<>(key1, new SessionWindow(1, 100)), "1"); + sessionStore.put(new Windowed<>(key2, new SessionWindow(2, 100)), "2"); + sessionStore.put(new Windowed<>(key3, new SessionWindow(3, 100)), "3"); + sessionStore.put(new Windowed<>(key1, new SessionWindow(4, 100)), "4"); + sessionStore.put(new Windowed<>(key2, new SessionWindow(5, 100)), "5"); + sessionStore.put(new Windowed<>(key3, new SessionWindow(6, 100)), "6"); + sessionStore.put(new Windowed<>(key1, new SessionWindow(7, 100)), "7"); + sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8"); + sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9"); + + final Set expectedKey1 = new HashSet<>(asList("1", "4", "7")); + assertThat(valuesToSet(sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1)); + final Set expectedKey2 = new HashSet<>(asList("2", "5", "8")); + assertThat(valuesToSet(sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2)); + final Set expectedKey3 = new HashSet<>(asList("3", "6", "9")); + assertThat(valuesToSet(sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3)); + } + @Test public void testIteratorPeek() { sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); @@ -342,6 +514,21 @@ public void testIteratorPeek() { assertFalse(iterator.hasNext()); } + @Test + public void testIteratorPeekBackward() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); + + final KeyValueIterator, Long> iterator = sessionStore.backwardFindSessions("a", 0L, 20); + + assertEquals(iterator.peekNextKey(), new Windowed<>("a", new SessionWindow(10L, 20L))); + assertEquals(iterator.peekNextKey(), iterator.next().key); + assertEquals(iterator.peekNextKey(), iterator.next().key); + assertFalse(iterator.hasNext()); + } + @Test public void shouldRestore() { final List, Long>> expected = Arrays.asList( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java new file mode 100644 index 0000000000000..ea4b147f00fa3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CacheFlushListenerStub.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.streams.kstream.internals.Change; + +import java.util.HashMap; +import java.util.Map; + +public class CacheFlushListenerStub implements CacheFlushListener { + private final Deserializer keyDeserializer; + private final Deserializer valueDeserializer; + final Map> forwarded = new HashMap<>(); + + CacheFlushListenerStub(final Deserializer keyDeserializer, + final Deserializer valueDeserializer) { + this.keyDeserializer = keyDeserializer; + this.valueDeserializer = valueDeserializer; + } + + @Override + public void apply(final byte[] key, + final byte[] newValue, + final byte[] oldValue, + final long timestamp) { + forwarded.put( + keyDeserializer.deserialize(null, key), + new Change<>( + valueDeserializer.deserialize(null, newValue), + valueDeserializer.deserialize(null, oldValue) + ) + ); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java similarity index 94% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java index 89e2b0e96399c..a9085a6c13847 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java @@ -17,14 +17,12 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -43,9 +41,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; import static org.hamcrest.CoreMatchers.equalTo; @@ -57,7 +53,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { +public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { private final static String TOPIC = "topic"; private static final String CACHE_NAMESPACE = "0_0-store-name"; @@ -527,27 +523,4 @@ private int addItemsToCache() { return i; } - public static class CacheFlushListenerStub implements CacheFlushListener { - final Deserializer keyDeserializer; - final Deserializer valueDeserializer; - final Map> forwarded = new HashMap<>(); - - CacheFlushListenerStub(final Deserializer keyDeserializer, - final Deserializer valueDeserializer) { - this.keyDeserializer = keyDeserializer; - this.valueDeserializer = valueDeserializer; - } - - @Override - public void apply(final byte[] key, - final byte[] newValue, - final byte[] oldValue, - final long timestamp) { - forwarded.put( - keyDeserializer.deserialize(null, key), - new Change<>( - valueDeserializer.deserialize(null, newValue), - valueDeserializer.deserialize(null, oldValue))); - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java similarity index 71% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index 05e97a2a4b1a5..e584e2ca706b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -46,14 +46,11 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.Set; import static java.util.Arrays.asList; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; @@ -68,7 +65,7 @@ import static org.junit.Assert.assertTrue; @SuppressWarnings("PointlessArithmeticExpression") -public class CachingSessionStoreTest { +public class CachingInMemorySessionStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 600; private static final Long DEFAULT_TIMESTAMP = 10L; @@ -80,14 +77,14 @@ public class CachingSessionStoreTest { private final Bytes keyAA = Bytes.wrap("aa".getBytes()); private final Bytes keyB = Bytes.wrap("b".getBytes()); - private SessionStore underlyingStore = - new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope"); + private SessionStore underlyingStore; private InternalMockProcessorContext context; private CachingSessionStore cachingStore; private ThreadCache cache; @Before public void before() { + underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope"); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); @@ -157,6 +154,21 @@ public void shouldPutFetchAllKeysFromCache() { assertFalse(all.hasNext()); } + @Test + public void shouldPutBackwardFetchAllKeysFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator, byte[]> all = cachingStore.backwardFindSessions(keyA, keyB, 0, 0); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + @Test public void shouldCloseWrappedStoreAndCacheAfterErrorDuringCacheFlush() { setUpCloseTests(); @@ -211,7 +223,7 @@ private void setUpCloseTests() { EasyMock.replay(underlyingStore); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cache = EasyMock.niceMock(ThreadCache.class); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); cachingStore.init((StateStoreContext) context, cachingStore); } @@ -230,6 +242,20 @@ public void shouldPutFetchRangeFromCache() { assertFalse(some.hasNext()); } + @Test + public void shouldPutBackwardFetchRangeFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator, byte[]> some = cachingStore.backwardFindSessions(keyAA, keyB, 0, 0); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + @Test public void shouldFetchAllSessionsWithSameRecordKey() { final List, byte[]>> expected = asList( @@ -249,6 +275,26 @@ public void shouldFetchAllSessionsWithSameRecordKey() { verifyKeyValueList(expected, results); } + @Test + public void shouldBackwardFetchAllSessionsWithSameRecordKey() { + final List, byte[]>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()) + ); + for (final KeyValue, byte[]> kv : expected) { + cachingStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "5".getBytes()); + + final List, byte[]>> results = toList(cachingStore.backwardFetch(keyA)); + Collections.reverse(results); + verifyKeyValueList(expected, results); + } + @Test public void shouldFlushItemsToStoreOnEviction() { final List, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d"); @@ -292,15 +338,50 @@ public void shouldFetchCorrectlyAcrossSegments() { final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); final KeyValueIterator, byte[]> results = - cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2); + cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 5); assertEquals(a1, results.next().key); assertEquals(a2, results.next().key); assertEquals(a3, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a6, results.next().key); + assertFalse(results.hasNext()); + } + + @Test + public void shouldBackwardFetchCorrectlyAcrossSegments() { + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); + final KeyValueIterator, byte[]> results = + cachingStore.backwardFindSessions(keyA, 0, SEGMENT_INTERVAL * 5); + assertEquals(a6, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a3, results.next().key); + assertEquals(a2, results.next().key); + assertEquals(a1, results.next().key); assertFalse(results.hasNext()); } @@ -319,12 +400,35 @@ public void shouldFetchRangeCorrectlyAcrossSegments() { final KeyValueIterator, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); - final Set> keys = new HashSet<>(); + final List> keys = new ArrayList<>(); while (rangeResults.hasNext()) { keys.add(rangeResults.next().key); } rangeResults.close(); - assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys); + assertEquals(asList(a1, aa1, a2, a3, aa3), keys); + } + + @Test + public void shouldBackwardFetchRangeCorrectlyAcrossSegments() { + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(aa1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.put(aa3, "3".getBytes()); + + final KeyValueIterator, byte[]> rangeResults = + cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); + final List> keys = new ArrayList<>(); + while (rangeResults.hasNext()) { + keys.add(rangeResults.next().key); + } + rangeResults.close(); + assertEquals(asList(aa3, a3, a2, aa1, a1), keys); } @Test @@ -473,6 +577,24 @@ public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindS assertFalse(keyRangeIterator.hasNext()); } + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsBackwardsAndEqualKeyRangeFindSessions() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); + + final KeyValueIterator, byte[]> singleKeyIterator = + cachingStore.backwardFindSessions(keyAA, 0L, 10L); + final KeyValueIterator, byte[]> keyRangeIterator = + cachingStore.backwardFindSessions(keyAA, keyAA, 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test public void shouldClearNamespaceCacheOnClose() { final Windowed a1 = new Windowed<>(keyA, new SessionWindow(0, 0)); @@ -482,68 +604,90 @@ public void shouldClearNamespaceCacheOnClose() { assertEquals(0, cache.size()); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToFetchFromClosedCachingStore() { cachingStore.close(); - cachingStore.fetch(keyA); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.fetch(keyA)); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() { cachingStore.close(); - cachingStore.findSessions(keyA, 0, Long.MAX_VALUE); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.findSessions(keyA, 0, Long.MAX_VALUE)); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToRemoveFromClosedCachingStore() { cachingStore.close(); - cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0))); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)))); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToPutIntoClosedCachingStore() { cachingStore.close(); - cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes())); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { - cachingStore.findSessions(null, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { - cachingStore.findSessions(null, keyA, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { - cachingStore.findSessions(keyA, null, 1L, 2L); + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { - cachingStore.fetch(null, keyA); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullToKey() { - cachingStore.fetch(keyA, null); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnFetchNullKey() { - cachingStore.fetch(null); + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnRemoveNullKey() { - cachingStore.remove(null); + assertThrows(NullPointerException.class, () -> cachingStore.remove(null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnPutNullKey() { - cachingStore.put(null, "1".getBytes()); + assertThrows(NullPointerException.class, () -> cachingStore.put(null, "1".getBytes())); + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWhenBackwardWithNegativeFromKey() { + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingSessionStore.class)) { + final KeyValueIterator, byte[]> iterator = cachingStore.backwardFindSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat( + messages, + hasItem( + "Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers" + ) + ); + } } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java new file mode 100644 index 0000000000000..d472c7f5637db --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -0,0 +1,766 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import static java.util.Arrays.asList; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; +import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class CachingPersistentSessionStoreTest { + + private static final int MAX_CACHE_SIZE_BYTES = 600; + private static final Long DEFAULT_TIMESTAMP = 10L; + private static final long SEGMENT_INTERVAL = 100L; + private static final String TOPIC = "topic"; + private static final String CACHE_NAMESPACE = "0_0-store-name"; + + private final Bytes keyA = Bytes.wrap("a".getBytes()); + private final Bytes keyAA = Bytes.wrap("aa".getBytes()); + private final Bytes keyB = Bytes.wrap("b".getBytes()); + + private SessionStore underlyingStore; + private CachingSessionStore cachingStore; + private ThreadCache cache; + + @Before + public void before() { + final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore( + "store-name", + "metric-scope", + Long.MAX_VALUE, + SEGMENT_INTERVAL, + new SessionKeySchema() + ); + underlyingStore = new RocksDBSessionStore(segmented); + cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); + cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); + final InternalMockProcessorContext context = + new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, null)); + cachingStore.init((StateStoreContext) context, cachingStore); + } + + @After + public void after() { + cachingStore.close(); + } + + @Test + public void shouldPutFetchFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator, byte[]> a = + cachingStore.findSessions(keyA, 0, 0); + final KeyValueIterator, byte[]> b = + cachingStore.findSessions(keyB, 0, 0); + + verifyWindowedKeyValue(a.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(b.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(a.hasNext()); + assertFalse(b.hasNext()); + } + + @Test + public void shouldPutFetchAllKeysFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator, byte[]> all = + cachingStore.findSessions(keyA, keyB, 0, 0); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + @Test + public void shouldPutBackwardFetchAllKeysFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator, byte[]> all = + cachingStore.backwardFindSessions(keyA, keyB, 0, 0); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(all.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); + assertFalse(all.hasNext()); + } + + @Test + public void shouldCloseWrappedStoreAndCacheAfterErrorDuringCacheFlush() { + setUpCloseTests(); + EasyMock.reset(cache); + cache.flush(CACHE_NAMESPACE); + EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush")); + EasyMock.replay(cache); + EasyMock.reset(underlyingStore); + underlyingStore.close(); + EasyMock.replay(underlyingStore); + + assertThrows(RuntimeException.class, cachingStore::close); + EasyMock.verify(cache, underlyingStore); + } + + @Test + public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() { + setUpCloseTests(); + EasyMock.reset(cache); + cache.flush(CACHE_NAMESPACE); + cache.close(CACHE_NAMESPACE); + EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close")); + EasyMock.replay(cache); + EasyMock.reset(underlyingStore); + underlyingStore.close(); + EasyMock.replay(underlyingStore); + + assertThrows(RuntimeException.class, cachingStore::close); + EasyMock.verify(cache, underlyingStore); + } + + @Test + public void shouldCloseCacheAfterErrorDuringWrappedStoreClose() { + setUpCloseTests(); + EasyMock.reset(cache); + cache.flush(CACHE_NAMESPACE); + cache.close(CACHE_NAMESPACE); + EasyMock.replay(cache); + EasyMock.reset(underlyingStore); + underlyingStore.close(); + EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close")); + EasyMock.replay(underlyingStore); + + assertThrows(RuntimeException.class, cachingStore::close); + EasyMock.verify(cache, underlyingStore); + } + + private void setUpCloseTests() { + underlyingStore = EasyMock.createNiceMock(SessionStore.class); + EasyMock.expect(underlyingStore.name()).andStubReturn("store-name"); + EasyMock.expect(underlyingStore.isOpen()).andStubReturn(true); + EasyMock.replay(underlyingStore); + cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); + cache = EasyMock.niceMock(ThreadCache.class); + final InternalMockProcessorContext context = + new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, null)); + cachingStore.init((StateStoreContext) context, cachingStore); + } + + @Test + public void shouldPutFetchRangeFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator, byte[]> some = + cachingStore.findSessions(keyAA, keyB, 0, 0); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + @Test + public void shouldPutBackwardFetchRangeFromCache() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), "1".getBytes()); + + assertEquals(3, cache.size()); + + final KeyValueIterator, byte[]> some = + cachingStore.backwardFindSessions(keyAA, keyB, 0, 0); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); + verifyWindowedKeyValue(some.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1"); + assertFalse(some.hasNext()); + } + + @Test + public void shouldFetchAllSessionsWithSameRecordKey() { + final List, byte[]>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()) + ); + for (final KeyValue, byte[]> kv : expected) { + cachingStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "5".getBytes()); + + final List, byte[]>> results = toList(cachingStore.fetch(keyA)); + verifyKeyValueList(expected, results); + } + + @Test + public void shouldBackwardFetchAllSessionsWithSameRecordKey() { + final List, byte[]>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()) + ); + for (final KeyValue, byte[]> kv : expected) { + cachingStore.put(kv.key, kv.value); + } + + // add one that shouldn't appear in the results + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "5".getBytes()); + + final List, byte[]>> results = toList(cachingStore.backwardFetch(keyA)); + Collections.reverse(results); + verifyKeyValueList(expected, results); + } + + @Test + public void shouldFlushItemsToStoreOnEviction() { + final List, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d"); + assertEquals(added.size() - 1, cache.size()); + final KeyValueIterator, byte[]> iterator = + cachingStore.findSessions(added.get(0).key.key(), 0, 0); + final KeyValue, byte[]> next = iterator.next(); + assertEquals(added.get(0).key, next.key); + assertArrayEquals(added.get(0).value, next.value); + } + + @Test + public void shouldQueryItemsInCacheAndStore() { + final List, byte[]>> added = addSessionsUntilOverflow("a"); + final KeyValueIterator, byte[]> iterator = cachingStore.findSessions( + Bytes.wrap("a".getBytes(StandardCharsets.UTF_8)), + 0, + added.size() * 10L + ); + final List, byte[]>> actual = toList(iterator); + verifyKeyValueList(added, actual); + } + + @Test + public void shouldRemove() { + final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); + final Windowed b = new Windowed<>(keyB, new SessionWindow(0, 0)); + cachingStore.put(a, "2".getBytes()); + cachingStore.put(b, "2".getBytes()); + cachingStore.remove(a); + + final KeyValueIterator, byte[]> rangeIter = + cachingStore.findSessions(keyA, 0, 0); + assertFalse(rangeIter.hasNext()); + + assertNull(cachingStore.fetchSession(keyA, 0, 0)); + assertThat(cachingStore.fetchSession(keyB, 0, 0), equalTo("2".getBytes())); + + } + + @Test + public void shouldFetchCorrectlyAcrossSegments() { + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); + final KeyValueIterator, byte[]> results = + cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 5); + assertEquals(a1, results.next().key); + assertEquals(a2, results.next().key); + assertEquals(a3, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a6, results.next().key); + assertFalse(results.hasNext()); + } + + @Test + public void shouldBackwardFetchCorrectlyAcrossSegments() { + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed a4 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 3, SEGMENT_INTERVAL * 3)); + final Windowed a5 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 4, SEGMENT_INTERVAL * 4)); + final Windowed a6 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 5, SEGMENT_INTERVAL * 5)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.flush(); + cachingStore.put(a4, "4".getBytes()); + cachingStore.put(a5, "5".getBytes()); + cachingStore.put(a6, "6".getBytes()); + final KeyValueIterator, byte[]> results = + cachingStore.backwardFindSessions(keyA, 0, SEGMENT_INTERVAL * 5); + assertEquals(a6, results.next().key); + assertEquals(a5, results.next().key); + assertEquals(a4, results.next().key); + assertEquals(a3, results.next().key); + assertEquals(a2, results.next().key); + assertEquals(a1, results.next().key); + assertFalse(results.hasNext()); + } + + @Test + public void shouldFetchRangeCorrectlyAcrossSegments() { + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(aa1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.put(aa3, "3".getBytes()); + + final KeyValueIterator, byte[]> rangeResults = + cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); + final List> keys = new ArrayList<>(); + while (rangeResults.hasNext()) { + keys.add(rangeResults.next().key); + } + rangeResults.close(); + assertEquals(asList(a1, aa1, a2, a3, aa3), keys); + } + + @Test + public void shouldBackwardFetchRangeCorrectlyAcrossSegments() { + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + cachingStore.put(a1, "1".getBytes()); + cachingStore.put(aa1, "1".getBytes()); + cachingStore.put(a2, "2".getBytes()); + cachingStore.put(a3, "3".getBytes()); + cachingStore.put(aa3, "3".getBytes()); + + final KeyValueIterator, byte[]> rangeResults = + cachingStore.backwardFindSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); + final List> keys = new ArrayList<>(); + while (rangeResults.hasNext()) { + keys.add(rangeResults.next().key); + } + rangeResults.close(); + assertEquals(asList(aa3, a3, a2, aa1, a1), keys); + } + + @Test + public void shouldSetFlushListener() { + assertTrue(cachingStore.setFlushListener(null, true)); + assertTrue(cachingStore.setFlushListener(null, false)); + } + + @Test + public void shouldForwardChangedValuesDuringFlush() { + final Windowed a = new Windowed<>(keyA, new SessionWindow(2, 4)); + final Windowed b = new Windowed<>(keyA, new SessionWindow(1, 2)); + final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(2, 4)); + final Windowed bDeserialized = new Windowed<>("a", new SessionWindow(1, 2)); + final CacheFlushListenerStub, String> flushListener = + new CacheFlushListenerStub<>( + new SessionWindowedDeserializer<>(new StringDeserializer()), + new StringDeserializer() + ); + cachingStore.setFlushListener(flushListener, true); + + cachingStore.put(b, "1".getBytes()); + cachingStore.flush(); + + assertEquals( + Collections.singletonList( + new KeyValueTimestamp<>( + bDeserialized, + new Change<>("1", null), + DEFAULT_TIMESTAMP + ) + ), + flushListener.forwarded + ); + flushListener.forwarded.clear(); + + cachingStore.put(a, "1".getBytes()); + cachingStore.flush(); + + assertEquals( + Collections.singletonList( + new KeyValueTimestamp<>( + aDeserialized, + new Change<>("1", null), + DEFAULT_TIMESTAMP + ) + ), + flushListener.forwarded + ); + flushListener.forwarded.clear(); + + cachingStore.put(a, "2".getBytes()); + cachingStore.flush(); + + assertEquals( + Collections.singletonList( + new KeyValueTimestamp<>( + aDeserialized, + new Change<>("2", "1"), + DEFAULT_TIMESTAMP + ) + ), + flushListener.forwarded + ); + flushListener.forwarded.clear(); + + cachingStore.remove(a); + cachingStore.flush(); + + assertEquals( + Collections.singletonList( + new KeyValueTimestamp<>( + aDeserialized, + new Change<>(null, "2"), + DEFAULT_TIMESTAMP + ) + ), + flushListener.forwarded + ); + flushListener.forwarded.clear(); + + cachingStore.put(a, "1".getBytes()); + cachingStore.put(a, "2".getBytes()); + cachingStore.remove(a); + cachingStore.flush(); + + assertEquals( + Collections.emptyList(), + flushListener.forwarded + ); + flushListener.forwarded.clear(); + } + + @Test + public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() { + final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); + final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); + final CacheFlushListenerStub, String> flushListener = + new CacheFlushListenerStub<>( + new SessionWindowedDeserializer<>(new StringDeserializer()), + new StringDeserializer()); + cachingStore.setFlushListener(flushListener, false); + + cachingStore.put(a, "1".getBytes()); + cachingStore.flush(); + + cachingStore.put(a, "2".getBytes()); + cachingStore.flush(); + + cachingStore.remove(a); + cachingStore.flush(); + + assertEquals( + asList( + new KeyValueTimestamp<>( + aDeserialized, + new Change<>("1", null), + DEFAULT_TIMESTAMP + ), + new KeyValueTimestamp<>( + aDeserialized, + new Change<>("2", null), + DEFAULT_TIMESTAMP + ), + new KeyValueTimestamp<>( + aDeserialized, + new Change<>(null, null), + DEFAULT_TIMESTAMP + ) + ), + flushListener.forwarded + ); + flushListener.forwarded.clear(); + + cachingStore.put(a, "1".getBytes()); + cachingStore.put(a, "2".getBytes()); + cachingStore.remove(a); + cachingStore.flush(); + + assertEquals( + Collections.emptyList(), + flushListener.forwarded + ); + flushListener.forwarded.clear(); + } + + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); + + final KeyValueIterator, byte[]> singleKeyIterator = + cachingStore.findSessions(keyAA, 0L, 10L); + final KeyValueIterator, byte[]> keyRangeIterator = + cachingStore.findSessions(keyAA, keyAA, 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsBackwardsAndEqualKeyRangeFindSessions() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); + + final KeyValueIterator, byte[]> singleKeyIterator = + cachingStore.backwardFindSessions(keyAA, 0L, 10L); + final KeyValueIterator, byte[]> keyRangeIterator = + cachingStore.backwardFindSessions(keyAA, keyAA, 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + + @Test + public void shouldClearNamespaceCacheOnClose() { + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(0, 0)); + cachingStore.put(a1, "1".getBytes()); + assertEquals(1, cache.size()); + cachingStore.close(); + assertEquals(0, cache.size()); + } + + @Test + public void shouldThrowIfTryingToFetchFromClosedCachingStore() { + cachingStore.close(); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.fetch(keyA)); + } + + @Test + public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() { + cachingStore.close(); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.findSessions(keyA, 0, Long.MAX_VALUE)); + } + + @Test + public void shouldThrowIfTryingToRemoveFromClosedCachingStore() { + cachingStore.close(); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)))); + } + + @Test + public void shouldThrowIfTryingToPutIntoClosedCachingStore() { + cachingStore.close(); + assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes())); + } + + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullFromKey() { + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(null, keyA, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFindSessionsNullToKey() { + assertThrows(NullPointerException.class, () -> cachingStore.findSessions(keyA, null, 1L, 2L)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullFromKey() { + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, keyA)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullToKey() { + assertThrows(NullPointerException.class, () -> cachingStore.fetch(keyA, null)); + } + + @Test + public void shouldThrowNullPointerExceptionOnFetchNullKey() { + assertThrows(NullPointerException.class, () -> cachingStore.fetch(null)); + } + + @Test + public void shouldThrowNullPointerExceptionOnRemoveNullKey() { + assertThrows(NullPointerException.class, () -> cachingStore.remove(null)); + } + + @Test + public void shouldThrowNullPointerExceptionOnPutNullKey() { + assertThrows(NullPointerException.class, () -> cachingStore.put(null, "1".getBytes())); + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWhenBackwardWithNegativeFromKey() { + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingSessionStore.class)) { + final KeyValueIterator, byte[]> iterator = + cachingStore.backwardFindSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat( + messages, + hasItem( + "Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers" + ) + ); + } + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingSessionStore.class)) { + final KeyValueIterator, byte[]> iterator = cachingStore.findSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat( + messages, + hasItem( + "Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers" + ) + ); + } + } + + private List, byte[]>> addSessionsUntilOverflow(final String... sessionIds) { + final Random random = new Random(); + final List, byte[]>> results = new ArrayList<>(); + while (cache.size() == results.size()) { + final String sessionId = sessionIds[random.nextInt(sessionIds.length)]; + addSingleSession(sessionId, results); + } + return results; + } + + private void addSingleSession(final String sessionId, final List, byte[]>> allSessions) { + final int timestamp = allSessions.size() * 10; + final Windowed key = new Windowed<>(Bytes.wrap(sessionId.getBytes()), new SessionWindow(timestamp, timestamp)); + final byte[] value = "1".getBytes(); + cachingStore.put(key, value); + allSessions.add(KeyValue.pair(key, value)); + } + + public static class CacheFlushListenerStub implements CacheFlushListener { + private final Deserializer keyDeserializer; + private final Deserializer valueDesializer; + private final List>> forwarded = new LinkedList<>(); + + CacheFlushListenerStub(final Deserializer keyDeserializer, + final Deserializer valueDesializer) { + this.keyDeserializer = keyDeserializer; + this.valueDesializer = valueDesializer; + } + + @Override + public void apply(final byte[] key, + final byte[] newValue, + final byte[] oldValue, + final long timestamp) { + forwarded.add( + new KeyValueTimestamp<>( + keyDeserializer.deserialize(null, key), + new Change<>( + valueDesializer.deserialize(null, newValue), + valueDesializer.deserialize(null, oldValue)), + timestamp + ) + ); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 2a04c482d1623..86ee164b07ac9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -75,7 +75,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -public class CachingWindowStoreTest { +public class CachingPersistentWindowStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 150; private static final long DEFAULT_TIMESTAMP = 10L; @@ -88,7 +88,7 @@ public class CachingWindowStoreTest { private RocksDBSegmentedBytesStore bytesStore; private WindowStore underlyingStore; private CachingWindowStore cachingStore; - private CachingKeyValueStoreTest.CacheFlushListenerStub, String> cacheListener; + private CacheFlushListenerStub, String> cacheListener; private ThreadCache cache; private WindowKeySchema keySchema; @@ -99,7 +99,7 @@ public void setUp() { underlyingStore = new RocksDBWindowStore(bytesStore, false, WINDOW_SIZE); final TimeWindowedDeserializer keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE); keyDeserializer.setIsChangelogTopic(true); - cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(keyDeserializer, new StringDeserializer()); + cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new StringDeserializer()); cachingStore = new CachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL); cachingStore.setFlushListener(cacheListener, false); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index c55c4e159c81d..8fdbd33cef964 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -130,6 +130,16 @@ public void shouldDelegateToUnderlyingStoreWhenFetching() { EasyMock.verify(inner); } + @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFetching() { + EasyMock.expect(inner.backwardFetch(bytesKey)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFetch(bytesKey); + EasyMock.verify(inner); + } + @Test public void shouldDelegateToUnderlyingStoreWhenFetchingRange() { EasyMock.expect(inner.fetch(bytesKey, bytesKey)).andReturn(KeyValueIterators.emptyIterator()); @@ -140,6 +150,16 @@ public void shouldDelegateToUnderlyingStoreWhenFetchingRange() { EasyMock.verify(inner); } + @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFetchingRange() { + EasyMock.expect(inner.backwardFetch(bytesKey, bytesKey)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFetch(bytesKey, bytesKey); + EasyMock.verify(inner); + } + @Test public void shouldDelegateToUnderlyingStoreWhenFindingSessions() { EasyMock.expect(inner.findSessions(bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); @@ -150,6 +170,16 @@ public void shouldDelegateToUnderlyingStoreWhenFindingSessions() { EasyMock.verify(inner); } + @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFindingSessions() { + EasyMock.expect(inner.backwardFindSessions(bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFindSessions(bytesKey, 0, 1); + EasyMock.verify(inner); + } + @Test public void shouldDelegateToUnderlyingStoreWhenFindingSessionRange() { EasyMock.expect(inner.findSessions(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); @@ -160,6 +190,16 @@ public void shouldDelegateToUnderlyingStoreWhenFindingSessionRange() { EasyMock.verify(inner); } + @Test + public void shouldDelegateToUnderlyingStoreWhenBackwardFindingSessionRange() { + EasyMock.expect(inner.backwardFindSessions(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.emptyIterator()); + + init(); + + store.backwardFindSessions(bytesKey, bytesKey, 0, 1); + EasyMock.verify(inner); + } + @Test public void shouldFlushUnderlyingStore() { inner.flush(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java index 617ff362b694a..4bd125a2cb114 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java @@ -55,56 +55,101 @@ public long segmentId(final Bytes key) { @Test public void shouldHaveNextFromStore() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator()); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), false); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldHaveNextFromReverseStore() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), true); assertTrue(mergeIterator.hasNext()); } @Test public void shouldGetNextFromStore() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator()); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), false); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); + } + + @Test + public void shouldGetNextFromReverseStore() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), true); assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); } @Test public void shouldPeekNextKeyFromStore() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator()); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), false); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow))); + } + + @Test + public void shouldPeekNextKeyFromReverseStore() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator(), true); assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow))); } @Test public void shouldHaveNextFromCache() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, false); + assertTrue(mergeIterator.hasNext()); + } + + @Test + public void shouldHaveNextFromReverseCache() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, true); assertTrue(mergeIterator.hasNext()); } @Test public void shouldGetNextFromCache() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, false); + assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); + } + + @Test + public void shouldGetNextFromReverseCache() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, true); assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); } @Test public void shouldPeekNextKeyFromCache() { - final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs); + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, false); + assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow))); + } + + @Test + public void shouldPeekNextKeyFromReverseCache() { + final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs, true); assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow))); } @Test public void shouldIterateBothStoreAndCache() { - final MergedSortedCacheSessionStoreIterator iterator = createIterator(storeKvs, cacheKvs); + final MergedSortedCacheSessionStoreIterator iterator = createIterator(storeKvs, cacheKvs, true); assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); assertFalse(iterator.hasNext()); } + @Test + public void shouldReverseIterateBothStoreAndCache() { + final MergedSortedCacheSessionStoreIterator iterator = createIterator(storeKvs, cacheKvs, false); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get()))); + assertThat(iterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get()))); + assertFalse(iterator.hasNext()); + } + private MergedSortedCacheSessionStoreIterator createIterator(final Iterator, byte[]>> storeKvs, - final Iterator> cacheKvs) { + final Iterator> cacheKvs, + final boolean forward) { final DelegatingPeekingKeyValueIterator, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs)); final PeekingKeyValueIterator cacheIterator = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs)); - return new MergedSortedCacheSessionStoreIterator(cacheIterator, storeIterator, SINGLE_SEGMENT_CACHE_FUNCTION); + return new MergedSortedCacheSessionStoreIterator(cacheIterator, storeIterator, SINGLE_SEGMENT_CACHE_FUNCTION, forward); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index a77dd077d72a7..6b1889aa305ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -302,6 +302,26 @@ public void shouldFindSessionsFromStoreAndRecordFetchMetric() { verify(innerStore); } + @Test + public void shouldBackwardFindSessionsFromStoreAndRecordFetchMetric() { + expect(innerStore.backwardFindSessions(KEY_BYTES, 0, 0)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator, String> iterator = store.backwardFindSessions(KEY, 0, 0); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + @Test public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0)) @@ -319,6 +339,26 @@ public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { verify(innerStore); } + @Test + public void shouldBackwardFindSessionRangeFromStoreAndRecordFetchMetric() { + expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator, String> iterator = store.backwardFindSessions(KEY, KEY, 0, 0); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + @Test public void shouldRemoveFromStoreAndRecordRemoveMetric() { innerStore.remove(WINDOWED_KEY_BYTES); @@ -350,6 +390,26 @@ public void shouldFetchForKeyAndRecordFetchMetric() { verify(innerStore); } + @Test + public void shouldBackwardFetchForKeyAndRecordFetchMetric() { + expect(innerStore.backwardFetch(KEY_BYTES)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator, String> iterator = store.backwardFetch(KEY); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + @Test public void shouldFetchRangeFromStoreAndRecordFetchMetric() { expect(innerStore.fetch(KEY_BYTES, KEY_BYTES)) @@ -367,6 +427,26 @@ public void shouldFetchRangeFromStoreAndRecordFetchMetric() { verify(innerStore); } + @Test + public void shouldBackwardFetchRangeFromStoreAndRecordFetchMetric() { + expect(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES)) + .andReturn( + new KeyValueIteratorStub<>( + Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() + ) + ); + init(); + + final KeyValueIterator, String> iterator = store.backwardFetch(KEY, KEY); + assertThat(iterator.next().value, equalTo(VALUE)); + assertFalse(iterator.hasNext()); + iterator.close(); + + final KafkaMetric metric = metric("fetch-rate"); + assertTrue((Double) metric.metricValue() > 0); + verify(innerStore); + } + @Test public void shouldRecordRestoreTimeOnInit() { init(); diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index a2924fc6568f3..ff37e2586cd94 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -32,7 +32,7 @@ import java.util.TreeMap; public class ReadOnlySessionStoreStub implements ReadOnlySessionStore, StateStore { - private NavigableMap, V>>> sessions = new TreeMap<>(); + private final NavigableMap, V>>> sessions = new TreeMap<>(); private boolean open = true; public void put(final Windowed sessionKey, final V value) { @@ -42,6 +42,31 @@ public void put(final Windowed sessionKey, final V value) { sessions.get(sessionKey.key()).add(KeyValue.pair(sessionKey, value)); } + @Override + public KeyValueIterator, V> findSessions(K key, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(K key, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public KeyValueIterator, V> findSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public KeyValueIterator, V> backwardFindSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + + @Override + public V fetchSession(K key, long startTime, long endTime) { + throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); + } + @Override public KeyValueIterator, V> fetch(final K key) { if (!open) { @@ -53,6 +78,17 @@ public KeyValueIterator, V> fetch(final K key) { return new KeyValueIteratorStub<>(sessions.get(key).iterator()); } + @Override + public KeyValueIterator, V> backwardFetch(K key) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (!sessions.containsKey(key)) { + return new KeyValueIteratorStub<>(Collections.emptyIterator()); + } + return new KeyValueIteratorStub<>(sessions.descendingMap().get(key).iterator()); + } + @Override public KeyValueIterator, V> fetch(final K from, final K to) { if (!open) { @@ -87,6 +123,40 @@ public KeyValue, V> next() { ); } + @Override + public KeyValueIterator, V> backwardFetch(K from, K to) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (sessions.subMap(from, true, to, true).isEmpty()) { + return new KeyValueIteratorStub<>(Collections.emptyIterator()); + } + final Iterator, V>>> keysIterator = + sessions.subMap(from, true, to, true).descendingMap().values().iterator(); + return new KeyValueIteratorStub<>( + new Iterator, V>>() { + + Iterator, V>> it; + + @Override + public boolean hasNext() { + while (it == null || !it.hasNext()) { + if (!keysIterator.hasNext()) { + return false; + } + it = keysIterator.next().iterator(); + } + return true; + } + + @Override + public KeyValue, V> next() { + return it.next(); + } + } + ); + } + @Override public String name() { return "";