diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index 6c0c97599df65..be47e89c765c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -273,8 +273,8 @@ public void put(final Windowed sessionKey, } @Override - public AGG fetchSession(final K key, final long startTime, final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + public AGG fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -283,9 +283,9 @@ public KeyValueIterator, AGG> fetch(final K key) { } @Override - public KeyValueIterator, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); + public KeyValueIterator, AGG> fetch(final K keyFrom, + final K keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index eac606cdbe302..60fa8f436fbb6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -265,9 +265,9 @@ public void put(final Windowed sessionKey, @Override public AGG fetchSession(final K key, - final long startTime, - final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -276,9 +276,9 @@ public KeyValueIterator, AGG> fetch(final K key) { } @Override - public KeyValueIterator, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); + public KeyValueIterator, AGG> fetch(final K keyFrom, + final K keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 8874908d18072..0ade24286fdd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -19,162 +19,298 @@ import org.apache.kafka.streams.kstream.Windowed; +import java.time.Instant; + /** - * A session store that only supports read operations. - * Implementations should be thread-safe as concurrent reads and writes - * are expected. + * A session store that only supports read operations. Implementations should be thread-safe as + * concurrent reads and writes are expected. * - * @param the key type + * @param the key type * @param the aggregated value type */ public interface ReadOnlySessionStore { /** - * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from earliest to latest. + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from earliest to latest. *

* This iterator must be closed after use. * * @param key the key to return sessions for - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. - * @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching key and aggregated values, from earliest to + * latest session time. * @throws NullPointerException If null is used for key. */ default KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from latest to earliest. + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from earliest to latest. *

* This iterator must be closed after use. * * @param key the key to return sessions for - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. - * @return backward iterator of sessions with the matching key and aggregated values, from latest to earliest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching key and aggregated values, from earliest to + * latest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator, AGG> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from latest to earliest. + *

+ * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching key and aggregated values, from + * latest to earliest session time. * @throws NullPointerException If null is used for key. */ default KeyValueIterator, AGG> backwardFindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from latest to earliest. + *

+ * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching key and aggregated values, from + * latest to earliest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator, AGG> backwardFindSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from earliest to latest. + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * earliest to latest. *

* This iterator must be closed after use. * * @param keyFrom The first key that could be in the range * @param keyTo The last key that could be in the range - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. - * @return iterator of sessions with the matching keys and aggregated values, from earliest to latest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching keys and aggregated values, from earliest to + * latest session time. * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * earliest to latest. + *

+ * This iterator must be closed after use. + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching keys and aggregated values, from earliest to + * latest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } /** - * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from latest to earliest. + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * latest to earliest. *

* This iterator must be closed after use. * * @param keyFrom The first key that could be in the range * @param keyTo The last key that could be in the range - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. - * @return backward iterator of sessions with the matching keys and aggregated values, from latest to earliest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching keys and aggregated values, from + * latest to earliest session time. * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * latest to earliest. + *

+ * This iterator must be closed after use. + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching keys and aggregated values, from + * latest to earliest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** * Get the value of key from a single session. * - * @param key the key to fetch - * @param startTime start timestamp of the session - * @param endTime end timestamp of the session + * @param key the key to fetch + * @param earliestSessionEndTime start timestamp of the session + * @param latestSessionStartTime end timestamp of the session * @return The value or {@code null} if no session associated with the key can be found * @throws NullPointerException If {@code null} is used for any key. */ - default AGG fetchSession(final K key, final long startTime, final long endTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + default AGG fetchSession(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Retrieve all aggregated sessions for the provided key. - * This iterator must be closed after use. + * Get the value of key from a single session. + * + * @param key the key to fetch + * @param earliestSessionEndTime start timestamp of the session + * @param latestSessionStartTime end timestamp of the session + * @return The value or {@code null} if no session associated with the key can be found + * @throws NullPointerException If {@code null} is used for any key. + */ + default AGG fetchSession(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Retrieve all aggregated sessions for the provided key. This iterator must be closed after + * use. *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * - * @param key record key to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. - * @throws NullPointerException If null is used for key. - * + * @param key record key to find aggregated session values for + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest + * session. + * @throws NullPointerException If null is used for key. */ KeyValueIterator, AGG> fetch(final K key); /** - * Retrieve all aggregated sessions for the provided key. - * This iterator must be closed after use. + * Retrieve all aggregated sessions for the provided key. This iterator must be closed after + * use. *

* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest * available session to the oldest/earliest session. * * @param key record key to find aggregated session values for - * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @return backward KeyValueIterator containing all sessions for the provided key, from newest + * to oldest session. * @throws NullPointerException If null is used for key. */ default KeyValueIterator, AGG> backwardFetch(final K key) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Retrieve all aggregated sessions for the given range of keys. - * This iterator must be closed after use. + * Retrieve all aggregated sessions for the given range of keys. This iterator must be closed + * after use. *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * - * @param from first key in the range to find aggregated session values for - * @param to last key in the range to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. - * @throws NullPointerException If null is used for any of the keys. + * @param keyFrom first key in the range to find aggregated session values for + * @param keyTo last key in the range to find aggregated session values for + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest + * session. + * @throws NullPointerException If null is used for any of the keys. */ - KeyValueIterator, AGG> fetch(final K from, final K to); + KeyValueIterator, AGG> fetch(final K keyFrom, final K keyTo); /** - * Retrieve all aggregated sessions for the given range of keys. - * This iterator must be closed after use. + * Retrieve all aggregated sessions for the given range of keys. This iterator must be closed + * after use. *

* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest * available session to the oldest/earliest session. * - * @param from first key in the range to find aggregated session values for - * @param to last key in the range to find aggregated session values for - * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @param keyFrom first key in the range to find aggregated session values for + * @param keyTo last key in the range to find aggregated session values for + * @return backward KeyValueIterator containing all sessions for the provided key, from newest + * to oldest session. * @throws NullPointerException If null is used for any of the keys. */ - default KeyValueIterator, AGG> backwardFetch(final K from, final K to) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + default KeyValueIterator, AGG> backwardFetch(final K keyFrom, final K keyTo) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index 47f48d5e8143c..926cddc4d2a43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -16,26 +16,90 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; +import java.time.Instant; + +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * Interface for storing the aggregated values of sessions. *

- * The key is internally represented as {@link Windowed Windowed<K>} that comprises the plain key - * and the {@link Window} that represents window start- and end-timestamp. + * The key is internally represented as {@link Windowed Windowed<K>} that comprises the plain + * key and the {@link Window} that represents window start- and end-timestamp. *

- * If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store - * while the two old sessions must be deleted. + * If two sessions are merged, a new session with new start- and end-timestamp must be inserted into + * the store while the two old sessions must be deleted. * * @param type of the record keys * @param type of the aggregated values */ public interface SessionStore extends StateStore, ReadOnlySessionStore { + @Override + default KeyValueIterator, AGG> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return findSessions( + key, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + @Override + default KeyValueIterator, AGG> backwardFindSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return backwardFindSessions( + key, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + default KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return findSessions( + keyFrom, + keyTo, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return backwardFindSessions( + keyFrom, + keyTo, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + default AGG fetchSession(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime) { + return fetchSession(key, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "startTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "endTime"))); + } + /** * Remove the session aggregated with provided {@link Windowed} key from the store + * * @param sessionKey key of the session to remove * @throws NullPointerException If null is used for sessionKey. */ @@ -43,9 +107,10 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore sessionKey, final AGG aggregate); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index d0fe25a6050ca..1cfb8ce498a64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -271,17 +271,18 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); if (context.cache() == null) { - return wrapped().fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } else { - final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime); + final Bytes bytesKey = SessionKeySchema.toBinary(key, earliestSessionEndTime, + latestSessionStartTime); final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); final LRUCacheEntry entry = context.cache().get(cacheName, cacheKey); if (entry == null) { - return wrapped().fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } else { return entry.value(); } @@ -301,19 +302,19 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, - final Bytes to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); - return findSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, + final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, - final Bytes to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); - return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, + final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } public void flush() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index f70eabd19f2f5..baa9846023a39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -91,8 +91,8 @@ public void put(final Windowed sessionKey, final byte[] aggregate) { } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -106,12 +106,12 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { - return wrapped().backwardFetch(from, to); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return wrapped().backwardFetch(keyFrom, keyTo); } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { - return wrapped().fetch(from, to); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index 72233122a6286..fb5fb61ae6108 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -155,12 +155,12 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, } @Override - public V fetchSession(final K key, final long startTime, final long endTime) { + public V fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { Objects.requireNonNull(key, "key can't be null"); final List> stores = storeProvider.stores(storeName, queryableStoreType); for (final ReadOnlySessionStore store : stores) { try { - return store.fetchSession(key, startTime, endTime); + return store.fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } catch (final InvalidStateStoreException ise) { throw new InvalidStateStoreException( "State store [" + storeName + "] is not available anymore" + @@ -220,10 +220,11 @@ public KeyValueIterator, V> backwardFetch(final K key) { } @Override - public KeyValueIterator, V> fetch(final K from, final K to) { - Objects.requireNonNull(from, "from can't be null"); - Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(from, to); + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom can't be null"); + Objects.requireNonNull(keyTo, "keyTo can't be null"); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = + store -> store.fetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>( storeProvider.stores(storeName, queryableStoreType).iterator(), @@ -231,10 +232,11 @@ public KeyValueIterator, V> fetch(final K from, final K to) { } @Override - public KeyValueIterator, V> backwardFetch(final K from, final K to) { - Objects.requireNonNull(from, "from can't be null"); - Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch(from, to); + public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom can't be null"); + Objects.requireNonNull(keyTo, "keyTo can't be null"); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = + store -> store.backwardFetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>( storeName, new CompositeKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index b35eaa2204286..affd47ac14292 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -150,18 +150,20 @@ public void remove(final Windowed sessionKey) { } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + public byte[] fetchSession(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet - if (endTime > observedStreamTime - retentionPeriod) { - final ConcurrentNavigableMap> keyMap = endTimeMap.get(endTime); + if (latestSessionStartTime > observedStreamTime - retentionPeriod) { + final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime); if (keyMap != null) { final ConcurrentNavigableMap startTimeMap = keyMap.get(key); if (startTimeMap != null) { - return startTimeMap.get(startTime); + return startTimeMap.get(earliestSessionEndTime); } } } @@ -273,25 +275,26 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(from, "from key cannot be null"); - Objects.requireNonNull(to, "to key cannot be null"); + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); removeExpiredSegments(); - return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); + return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { - Objects.requireNonNull(from, "from key cannot be null"); - Objects.requireNonNull(to, "to key cannot be null"); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); removeExpiredSegments(); - return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); + return registerNewIterator( + keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 7a31b170ea4e6..1fbc8db095799 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -200,12 +200,16 @@ public void remove(final Windowed sessionKey) { } @Override - public V fetchSession(final K key, final long startTime, final long endTime) { + public V fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { Objects.requireNonNull(key, "key cannot be null"); return maybeMeasureLatency( () -> { final Bytes bytesKey = keyBytes(key); - final byte[] result = wrapped().fetchSession(bytesKey, startTime, endTime); + final byte[] result = wrapped().fetchSession( + bytesKey, + earliestSessionEndTime, + latestSessionStartTime + ); if (result == null) { return null; } @@ -240,12 +244,12 @@ public KeyValueIterator, V> backwardFetch(final K key) { } @Override - public KeyValueIterator, V> fetch(final K from, - final K to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); + public KeyValueIterator, V> fetch(final K keyFrom, + final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( - wrapped().fetch(keyBytes(from), keyBytes(to)), + wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, streamsMetrics, serdes, @@ -253,12 +257,12 @@ public KeyValueIterator, V> fetch(final K from, } @Override - public KeyValueIterator, V> backwardFetch(final K from, - final K to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); + public KeyValueIterator, V> backwardFetch(final K keyFrom, + final K keyTo) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( - wrapped().backwardFetch(keyBytes(from), keyBytes(to)), + wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, streamsMetrics, serdes, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 338769abea4a5..f5d710894f767 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -83,8 +83,14 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { - return wrapped().get(SessionKeySchema.toBinary(key, startTime, endTime)); + public byte[] fetchSession(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().get(SessionKeySchema.toBinary( + key, + earliestSessionEndTime, + latestSessionStartTime + )); } @Override @@ -98,13 +104,13 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { - return findSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { - return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index ff37e2586cd94..06640559cb4bb 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -63,7 +63,7 @@ public KeyValueIterator, V> backwardFindSessions(K keyFrom, K keyTo, } @Override - public V fetchSession(K key, long startTime, long endTime) { + public V fetchSession(K key, long earliestSessionEndTime, long latestSessionStartTime) { throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); } @@ -90,14 +90,15 @@ public KeyValueIterator, V> backwardFetch(K key) { } @Override - public KeyValueIterator, V> fetch(final K from, final K to) { + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - if (sessions.subMap(from, true, to, true).isEmpty()) { + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { return new KeyValueIteratorStub<>(Collections., V>>emptyIterator()); } - final Iterator, V>>> keysIterator = sessions.subMap(from, true, to, true).values().iterator(); + final Iterator, V>>> keysIterator = sessions.subMap(keyFrom, true, + keyTo, true).values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() { @@ -124,15 +125,15 @@ public KeyValue, V> next() { } @Override - public KeyValueIterator, V> backwardFetch(K from, K to) { + public KeyValueIterator, V> backwardFetch(K keyFrom, K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - if (sessions.subMap(from, true, to, true).isEmpty()) { + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { return new KeyValueIteratorStub<>(Collections.emptyIterator()); } final Iterator, V>>> keysIterator = - sessions.subMap(from, true, to, true).descendingMap().values().iterator(); + sessions.subMap(keyFrom, true, keyTo, true).descendingMap().values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() {