diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index 7c622fc0146e1..36c85116f2738 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -246,8 +246,8 @@ public void put(final Windowed sessionKey, } @Override - public AGG fetchSession(final K key, final long startTime, final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + public AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime) { + return wrapped().fetchSession(key, sessionStartTime, sessionEndTime); } @Override @@ -256,9 +256,9 @@ public KeyValueIterator, AGG> fetch(final K key) { } @Override - public KeyValueIterator, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); + public KeyValueIterator, AGG> fetch(final K keyFrom, + final K keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index 61e47f6cf6c6e..5c042b1d9d12c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -240,9 +240,9 @@ public void put(final Windowed sessionKey, @Override public AGG fetchSession(final K key, - final long startTime, - final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + final long sessionStartTime, + final long sessionEndTime) { + return wrapped().fetchSession(key, sessionStartTime, sessionEndTime); } @Override @@ -251,9 +251,9 @@ public KeyValueIterator, AGG> fetch(final K key) { } @Override - public KeyValueIterator, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); + public KeyValueIterator, AGG> fetch(final K keyFrom, + final K keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 230d2576178da..5f1daf90b4bb8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -19,40 +19,86 @@ import org.apache.kafka.streams.kstream.Windowed; +import java.time.Instant; + /** * A session store that only supports read operations. * Implementations should be thread-safe as concurrent reads and writes * are expected. * - * @param the key type + * @param the key type * @param the aggregated value type */ public interface ReadOnlySessionStore { /** - * Retrieve all aggregated sessions for the provided key. + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime + *

* This iterator must be closed after use. * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for + * @param latestSessionStartTime the end timestamp of the latest session to search for + * @return iterator of sessions with the matching key and aggregated values + * @throws NullPointerException if null is used for key. + */ + KeyValueIterator, AGG> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime); + + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions + * start is ≤ latestSessionStartTime + *

+ * This iterator must be closed after use. + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for + * @param latestSessionStartTime the end timestamp of the latest session to search for + * @return iterator of sessions with the matching keys and aggregated values + * @throws NullPointerException if null is used for any key. + */ + KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime); + + /** + * Get the value of key from a single session. + * + * @param key the key to fetch + * @param sessionStartTime start timestamp of the session + * @param sessionEndTime end timestamp of the session + * @return The value or {@code null} if no session associated with the key can be found + * @throws NullPointerException if {@code null} is used for any key. + */ + AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime); + + /** + * Retrieve all aggregated sessions for the provided key. + * This iterator must be closed after use. + *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * - * @param key record key to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key. - * @throws NullPointerException If null is used for key. - * + * @param key record key to find aggregated session values for + * @return KeyValueIterator containing all sessions for the provided key. + * @throws NullPointerException if null is used for key. */ KeyValueIterator, AGG> fetch(final K key); /** * Retrieve all aggregated sessions for the given range of keys. * This iterator must be closed after use. - * + *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * - * @param from first key in the range to find aggregated session values for - * @param to last key in the range to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key. - * @throws NullPointerException If null is used for any of the keys. + * @param keyFrom first key in the range to find aggregated session values for + * @param keyTo last key in the range to find aggregated session values for + * @return KeyValueIterator containing all sessions for the provided key. + * @throws NullPointerException if null is used for any of the keys. */ - KeyValueIterator, AGG> fetch(final K from, final K to); + KeyValueIterator, AGG> fetch(final K keyFrom, final K keyTo); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index faaa751489af4..d455936915567 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; +import java.time.Instant; + +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * Interface for storing the aggregated values of sessions. *

@@ -29,6 +34,9 @@ * If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store * while the two old sessions must be deleted. * + * Long-based time-ranges are used instead of Instant-based exposed in Read-only interface to avoid performance + * penalties caused by object allocation when using Stores via Processor API. + * * @param type of the record keys * @param type of the aggregated values */ @@ -37,56 +45,93 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore * This iterator must be closed after use. * - * @param key the key to return sessions for + * @param key the key to return sessions for * @param earliestSessionEndTime the end timestamp of the earliest session to search for * @param latestSessionStartTime the end timestamp of the latest session to search for * @return iterator of sessions with the matching key and aggregated values * @throws NullPointerException If null is used for key. */ - KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); + KeyValueIterator, AGG> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime); + + @Override + default KeyValueIterator, AGG> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return findSessions( + key, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } /** * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions * start is ≤ latestSessionStartTime - * + *

* This iterator must be closed after use. * - * @param keyFrom The first key that could be in the range - * @param keyTo The last key that could be in the range + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range * @param earliestSessionEndTime the end timestamp of the earliest session to search for * @param latestSessionStartTime the end timestamp of the latest session to search for * @return iterator of sessions with the matching keys and aggregated values * @throws NullPointerException If null is used for any key. */ - KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); + KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime); + + @Override + default KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return findSessions( + keyFrom, + keyTo, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } /** * Get the value of key from a single session. * - * @param key the key to fetch - * @param startTime start timestamp of the session - * @param endTime end timestamp of the session + * @param key the key to fetch + * @param sessionStartTime start timestamp of the session + * @param sessionEndTime end timestamp of the session * @return The value or {@code null} if no session associated with the key can be found - * @throws NullPointerException If {@code null} is used for any key. + * @throws NullPointerException if {@code null} is used for any key. */ - AGG fetchSession(final K key, final long startTime, final long endTime); + AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime); + + @Override + default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) { + return fetchSession( + key, + ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")), + ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime"))); + } /** * Remove the session aggregated with provided {@link Windowed} key from the store + * * @param sessionKey key of the session to remove - * @throws NullPointerException If null is used for sessionKey. + * @throws NullPointerException if null is used for sessionKey. */ void remove(final Windowed sessionKey); /** * Write the aggregated value for the provided key to the store + * * @param sessionKey key of the session to write * @param aggregate the aggregated value for the session, it can be null; * if the serialized bytes are also null it is interpreted as deletes - * @throws NullPointerException If null is used for sessionKey. + * @throws NullPointerException if null is used for sessionKey. */ void put(final Windowed sessionKey, final AGG aggregate); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 4ac43a216c3fe..32a2fd45e9010 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -200,17 +200,17 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + public byte[] fetchSession(final Bytes key, final long sessionStartTime, final long sessionEndTime) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); if (context.cache() == null) { - return wrapped().fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, sessionStartTime, sessionEndTime); } else { - final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime); + final Bytes bytesKey = SessionKeySchema.toBinary(key, sessionStartTime, sessionEndTime); final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); final LRUCacheEntry entry = context.cache().get(cacheName, cacheKey); if (entry == null) { - return wrapped().fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, sessionStartTime, sessionEndTime); } else { return entry.value(); } @@ -224,11 +224,11 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, - final Bytes to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); - return findSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, + final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "from cannot be null"); + Objects.requireNonNull(keyTo, "to cannot be null"); + return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } public void flush() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index cc586d3ba1a90..de4440f06c0e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -67,8 +67,8 @@ public void put(final Windowed sessionKey, final byte[] aggregate) { } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + public byte[] fetchSession(final Bytes key, final long sessionStartTime, final long sessionEndTime) { + return wrapped().fetchSession(key, sessionStartTime, sessionEndTime); } @Override @@ -77,7 +77,7 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { - return wrapped().fetch(from, to); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index 63d551c1d089e..ad4b1164c751d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlySessionStore; +import java.time.Instant; import java.util.List; import java.util.Objects; @@ -42,6 +43,73 @@ public CompositeReadOnlySessionStore(final StateStoreProvider storeProvider, this.storeName = storeName; } + @Override + public KeyValueIterator, V> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + Objects.requireNonNull(key, "key can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + final KeyValueIterator, V> result = store.findSessions(key, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata. " + + "Original error message: " + ise.toString()); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public KeyValueIterator, V> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "keyFrom can't be null"); + Objects.requireNonNull(keyTo, "keyTo can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + final KeyValueIterator, V> result = store.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + if (!result.hasNext()) { + result.close(); + } else { + return result; + } + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata. " + + "Original error message: " + ise.toString()); + } + } + return KeyValueIterators.emptyIterator(); + } + + @Override + public V fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) { + Objects.requireNonNull(key, "key can't be null"); + final List> stores = storeProvider.stores(storeName, queryableStoreType); + for (final ReadOnlySessionStore store : stores) { + try { + return store.fetchSession(key, sessionStartTime, sessionEndTime); + } catch (final InvalidStateStoreException ise) { + throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata. " + + "Original error message: " + ise.toString()); + } + } + return null; + } + @Override public KeyValueIterator, V> fetch(final K key) { Objects.requireNonNull(key, "key can't be null"); @@ -56,22 +124,22 @@ public KeyValueIterator, V> fetch(final K key) { } } catch (final InvalidStateStoreException ise) { throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + - " and may have been migrated to another instance; " + - "please re-discover its location from the state metadata. " + - "Original error message: " + ise.toString()); + " and may have been migrated to another instance; " + + "please re-discover its location from the state metadata. " + + "Original error message: " + ise.toString()); } } return KeyValueIterators.emptyIterator(); } @Override - public KeyValueIterator, V> fetch(final K from, final K to) { - Objects.requireNonNull(from, "from can't be null"); - Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(from, to); + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>(storeName, - new CompositeKeyValueIterator<>( - storeProvider.stores(storeName, queryableStoreType).iterator(), - nextIteratorFunction)); + new CompositeKeyValueIterator<>( + storeProvider.stores(storeName, queryableStoreType).iterator(), + nextIteratorFunction)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index e4fda06682c86..01422edbfba50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -138,18 +138,18 @@ public void remove(final Windowed sessionKey) { } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + public byte[] fetchSession(final Bytes key, final long sessionStartTime, final long sessionEndTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet - if (endTime > observedStreamTime - retentionPeriod) { - final ConcurrentNavigableMap> keyMap = endTimeMap.get(endTime); + if (sessionEndTime > observedStreamTime - retentionPeriod) { + final ConcurrentNavigableMap> keyMap = endTimeMap.get(sessionEndTime); if (keyMap != null) { final ConcurrentNavigableMap startTimeMap = keyMap.get(key); if (startTimeMap != null) { - return startTimeMap.get(startTime); + return startTimeMap.get(sessionStartTime); } } } @@ -205,15 +205,15 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(from, "from key cannot be null"); - Objects.requireNonNull(to, "to key cannot be null"); + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); removeExpiredSegments(); - return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); + return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index d8ce02a6d18bb..252a9ea4e5898 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -158,12 +158,12 @@ public void remove(final Windowed sessionKey) { } @Override - public V fetchSession(final K key, final long startTime, final long endTime) { + public V fetchSession(final K key, final long sessionStartTime, final long sessionEndTime) { Objects.requireNonNull(key, "key cannot be null"); return maybeMeasureLatency( () -> { final Bytes bytesKey = keyBytes(key); - final byte[] result = wrapped().fetchSession(bytesKey, startTime, endTime); + final byte[] result = wrapped().fetchSession(bytesKey, sessionStartTime, sessionEndTime); if (result == null) { return null; } @@ -186,12 +186,12 @@ public KeyValueIterator, V> fetch(final K key) { } @Override - public KeyValueIterator, V> fetch(final K from, - final K to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); + public KeyValueIterator, V> fetch(final K keyFrom, + final K keyTo) { + Objects.requireNonNull(keyFrom, "from cannot be null"); + Objects.requireNonNull(keyTo, "to cannot be null"); return new MeteredWindowedKeyValueIterator<>( - wrapped().fetch(keyBytes(from), keyBytes(to)), + wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, streamsMetrics, serdes, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 2f7a211d5266a..34fac97a9e9af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -57,8 +57,8 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { - return wrapped().get(SessionKeySchema.toBinary(key, startTime, endTime)); + public byte[] fetchSession(final Bytes key, final long sessionStartTime, final long sessionEndTime) { + return wrapped().get(SessionKeySchema.toBinary(key, sessionStartTime, sessionEndTime)); } @Override @@ -67,8 +67,8 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { - return findSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 4f6d5debb0ada..c4db20acfb1c5 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -24,15 +24,17 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.ReadOnlySessionStore; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableMap; +import java.util.NoSuchElementException; import java.util.TreeMap; public class ReadOnlySessionStoreStub implements ReadOnlySessionStore, StateStore { - private NavigableMap, V>>> sessions = new TreeMap<>(); + private final NavigableMap, V>>> sessions = new TreeMap<>(); private boolean open = true; public void put(final Windowed sessionKey, final V value) { @@ -42,6 +44,116 @@ public void put(final Windowed sessionKey, final V value) { sessions.get(sessionKey.key()).add(KeyValue.pair(sessionKey, value)); } + @Override + public KeyValueIterator, V> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (!sessions.containsKey(key)) { + return null; + } + List, V>> found = new ArrayList<>(); + for (KeyValue, V> keyValue: sessions.get(key)) { + if (keyValue.key.window().startTime().compareTo(latestSessionStartTime) == 0 || + keyValue.key.window().endTime().compareTo(earliestSessionEndTime) == 0) { + found.add(KeyValue.pair(keyValue.key, keyValue.value)); + } + } + Iterator, V>> sessionsIterator = found.iterator(); + return new KeyValueIterator, V>() { + @Override + public void close() { + + } + + @Override + public Windowed peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next().key; + } + + @Override + public boolean hasNext() { + return sessionsIterator.hasNext(); + } + + @Override + public KeyValue, V> next() { + return sessionsIterator.next(); + } + }; + } + + @Override + public KeyValueIterator, V> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { + return new KeyValueIteratorStub<>(Collections., V>>emptyIterator()); + } + final Iterator, V>>> keysIterator = sessions.subMap(keyFrom, true, keyTo, true).values().iterator(); + List, V>> found = new ArrayList<>(); + while (keysIterator.hasNext()) { + final List, V>> iterator = keysIterator.next(); + for (final KeyValue, V> keyValue : iterator) { + if (keyValue.key.window().startTime().compareTo(latestSessionStartTime) == 0 || + keyValue.key.window().endTime().compareTo(earliestSessionEndTime) == 0) { + found.add(KeyValue.pair(keyValue.key, keyValue.value)); + } + } + } + final Iterator, V>> sessionsIterator = found.iterator(); + return new KeyValueIterator, V>() { + @Override + public void close() { + + } + + @Override + public Windowed peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next().key; + } + + @Override + public boolean hasNext() { + return sessionsIterator.hasNext(); + } + + @Override + public KeyValue, V> next() { + return sessionsIterator.next(); + } + }; + } + + @Override + public V fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) { + if (!open) { + throw new InvalidStateStoreException("not open"); + } + if (!sessions.containsKey(key)) { + return null; + } + for (KeyValue, V> keyValue: sessions.get(key)) { + if (keyValue.key.window().startTime().compareTo(sessionStartTime) == 0 && + keyValue.key.window().endTime().compareTo(sessionEndTime) == 0) { + return keyValue.value; + } + } + return null; + } + @Override public KeyValueIterator, V> fetch(final K key) { if (!open) { @@ -54,14 +166,14 @@ public KeyValueIterator, V> fetch(final K key) { } @Override - public KeyValueIterator, V> fetch(final K from, final K to) { + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - if (sessions.subMap(from, true, to, true).isEmpty()) { + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { return new KeyValueIteratorStub<>(Collections., V>>emptyIterator()); } - final Iterator, V>>> keysIterator = sessions.subMap(from, true, to, true).values().iterator(); + final Iterator, V>>> keysIterator = sessions.subMap(keyFrom, true, keyTo, true).values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() { @@ -94,17 +206,14 @@ public String name() { @Override public void init(final ProcessorContext context, final StateStore root) { - } @Override public void flush() { - } @Override public void close() { - } @Override