From 80fcb7ea5f186d552f2dfeec85cf0d36ab89d28e Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 23 Mar 2021 19:17:43 +0000 Subject: [PATCH 1/6] add instant-based methods to session store --- .../streams/state/ReadOnlySessionStore.java | 229 ++++++++++++++---- .../kafka/streams/state/SessionStore.java | 76 +++++- 2 files changed, 250 insertions(+), 55 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 8874908d18072..8ef50a77d5d5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -17,93 +17,204 @@ package org.apache.kafka.streams.state; +import java.time.Instant; import org.apache.kafka.streams.kstream.Windowed; /** - * A session store that only supports read operations. - * Implementations should be thread-safe as concurrent reads and writes - * are expected. + * A session store that only supports read operations. Implementations should be thread-safe as + * concurrent reads and writes are expected. * - * @param the key type + * @param the key type * @param the aggregated value type */ public interface ReadOnlySessionStore { /** - * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from earliest to latest. + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from earliest to latest. *

* This iterator must be closed after use. * * @param key the key to return sessions for - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. - * @return iterator of sessions with the matching key and aggregated values, from earliest to latest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching key and aggregated values, from earliest to + * latest session time. * @throws NullPointerException If null is used for key. */ default KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from latest to earliest. + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from earliest to latest. *

* This iterator must be closed after use. * * @param key the key to return sessions for - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. - * @return backward iterator of sessions with the matching key and aggregated values, from latest to earliest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching key and aggregated values, from earliest to + * latest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator, AGG> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from latest to earliest. + *

+ * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching key and aggregated values, from + * latest to earliest session time. * @throws NullPointerException If null is used for key. */ default KeyValueIterator, AGG> backwardFindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions with the matching key and the sessions end is ≥ earliestSessionEndTime + * and the sessions start is ≤ latestSessionStartTime iterating from latest to earliest. + *

+ * This iterator must be closed after use. + * + * @param key the key to return sessions for + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching key and aggregated values, from + * latest to earliest session time. + * @throws NullPointerException If null is used for key. + */ + default KeyValueIterator, AGG> backwardFindSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from earliest to latest. + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * earliest to latest. *

* This iterator must be closed after use. * * @param keyFrom The first key that could be in the range * @param keyTo The last key that could be in the range - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration starts. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration ends. - * @return iterator of sessions with the matching keys and aggregated values, from earliest to latest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching keys and aggregated values, from earliest to + * latest session time. * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * earliest to latest. + *

+ * This iterator must be closed after use. + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration starts. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration ends. + * @return iterator of sessions with the matching keys and aggregated values, from earliest to + * latest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } /** - * Fetch any sessions in the given range of keys and the sessions end is ≥ earliestSessionEndTime and the sessions - * start is ≤ latestSessionStartTime iterating from latest to earliest. + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * latest to earliest. *

* This iterator must be closed after use. * * @param keyFrom The first key that could be in the range * @param keyTo The last key that could be in the range - * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where iteration ends. - * @param latestSessionStartTime the end timestamp of the latest session to search for, where iteration starts. - * @return backward iterator of sessions with the matching keys and aggregated values, from latest to earliest session time. + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching keys and aggregated values, from + * latest to earliest session time. * @throws NullPointerException If null is used for any key. */ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Fetch any sessions in the given range of keys and the sessions end is ≥ + * earliestSessionEndTime and the sessions start is ≤ latestSessionStartTime iterating from + * latest to earliest. + *

+ * This iterator must be closed after use. + * + * @param keyFrom The first key that could be in the range + * @param keyTo The last key that could be in the range + * @param earliestSessionEndTime the end timestamp of the earliest session to search for, where + * iteration ends. + * @param latestSessionStartTime the end timestamp of the latest session to search for, where + * iteration starts. + * @return backward iterator of sessions with the matching keys and aggregated values, from + * latest to earliest session time. + * @throws NullPointerException If null is used for any key. + */ + default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** @@ -116,65 +227,85 @@ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, * @throws NullPointerException If {@code null} is used for any key. */ default AGG fetchSession(final K key, final long startTime, final long endTime) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Retrieve all aggregated sessions for the provided key. - * This iterator must be closed after use. + * Get the value of key from a single session. + * + * @param key the key to fetch + * @param startTime start timestamp of the session + * @param endTime end timestamp of the session + * @return The value or {@code null} if no session associated with the key can be found + * @throws NullPointerException If {@code null} is used for any key. + */ + default AGG fetchSession(final K key, final Instant startTime, final Instant endTime) { + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); + } + + /** + * Retrieve all aggregated sessions for the provided key. This iterator must be closed after + * use. *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * - * @param key record key to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. - * @throws NullPointerException If null is used for key. - * + * @param key record key to find aggregated session values for + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest + * session. + * @throws NullPointerException If null is used for key. */ KeyValueIterator, AGG> fetch(final K key); /** - * Retrieve all aggregated sessions for the provided key. - * This iterator must be closed after use. + * Retrieve all aggregated sessions for the provided key. This iterator must be closed after + * use. *

* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest * available session to the oldest/earliest session. * * @param key record key to find aggregated session values for - * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @return backward KeyValueIterator containing all sessions for the provided key, from newest + * to oldest session. * @throws NullPointerException If null is used for key. */ default KeyValueIterator, AGG> backwardFetch(final K key) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } /** - * Retrieve all aggregated sessions for the given range of keys. - * This iterator must be closed after use. + * Retrieve all aggregated sessions for the given range of keys. This iterator must be closed + * after use. *

* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * - * @param from first key in the range to find aggregated session values for - * @param to last key in the range to find aggregated session values for - * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest session. - * @throws NullPointerException If null is used for any of the keys. + * @param from first key in the range to find aggregated session values for + * @param to last key in the range to find aggregated session values for + * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest + * session. + * @throws NullPointerException If null is used for any of the keys. */ KeyValueIterator, AGG> fetch(final K from, final K to); /** - * Retrieve all aggregated sessions for the given range of keys. - * This iterator must be closed after use. + * Retrieve all aggregated sessions for the given range of keys. This iterator must be closed + * after use. *

* For each key, the iterator guarantees ordering of sessions, starting from the newest/latest * available session to the oldest/earliest session. * * @param from first key in the range to find aggregated session values for * @param to last key in the range to find aggregated session values for - * @return backward KeyValueIterator containing all sessions for the provided key, from newest to oldest session. + * @return backward KeyValueIterator containing all sessions for the provided key, from newest + * to oldest session. * @throws NullPointerException If null is used for any of the keys. */ default KeyValueIterator, AGG> backwardFetch(final K from, final K to) { - throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); + throw new UnsupportedOperationException( + "This API is not supported by this implementation of ReadOnlySessionStore."); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index 47f48d5e8143c..65ea4599319db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.streams.state; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + +import java.time.Instant; +import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; @@ -23,19 +27,78 @@ /** * Interface for storing the aggregated values of sessions. *

- * The key is internally represented as {@link Windowed Windowed<K>} that comprises the plain key - * and the {@link Window} that represents window start- and end-timestamp. + * The key is internally represented as {@link Windowed Windowed<K>} that comprises the plain + * key and the {@link Window} that represents window start- and end-timestamp. *

- * If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store - * while the two old sessions must be deleted. + * If two sessions are merged, a new session with new start- and end-timestamp must be inserted into + * the store while the two old sessions must be deleted. * * @param type of the record keys * @param type of the aggregated values */ public interface SessionStore extends StateStore, ReadOnlySessionStore { + @Override + default KeyValueIterator, AGG> findSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return findSessions( + key, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + @Override + default KeyValueIterator, AGG> backwardFindSessions(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return backwardFindSessions( + key, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + default KeyValueIterator, AGG> findSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return findSessions( + keyFrom, + keyTo, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, + final K keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { + return backwardFindSessions( + keyFrom, + keyTo, + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); + } + + default AGG fetchSession(final K key, final Instant startTime, final Instant endTime) { + return fetchSession(key, + ApiUtils.validateMillisecondInstant(startTime, + prepareMillisCheckFailMsgPrefix(startTime, "startTime")), + ApiUtils.validateMillisecondInstant(endTime, + prepareMillisCheckFailMsgPrefix(endTime, "endTime"))); + } + /** * Remove the session aggregated with provided {@link Windowed} key from the store + * * @param sessionKey key of the session to remove * @throws NullPointerException If null is used for sessionKey. */ @@ -43,9 +106,10 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore sessionKey, final AGG aggregate); From 561ec90e0c990dc666f6ffa8f084842d4124fe94 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 6 May 2021 13:19:02 +0100 Subject: [PATCH 2/6] reorder imports --- .../apache/kafka/streams/state/ReadOnlySessionStore.java | 3 ++- .../java/org/apache/kafka/streams/state/SessionStore.java | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 8ef50a77d5d5c..45eec5486ee24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -17,9 +17,10 @@ package org.apache.kafka.streams.state; -import java.time.Instant; import org.apache.kafka.streams.kstream.Windowed; +import java.time.Instant; + /** * A session store that only supports read operations. Implementations should be thread-safe as * concurrent reads and writes are expected. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index 65ea4599319db..5604cef150d2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -16,14 +16,15 @@ */ package org.apache.kafka.streams.state; -import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; - -import java.time.Instant; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; +import java.time.Instant; + +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + /** * Interface for storing the aggregated values of sessions. *

From a9ce07147bd72d836b2279f357a14ea36acd9dc5 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 6 May 2021 17:45:35 +0100 Subject: [PATCH 3/6] align var names --- .../internals/AbstractReadOnlyDecorator.java | 10 +++--- .../internals/AbstractReadWriteDecorator.java | 12 +++---- .../streams/state/ReadOnlySessionStore.java | 32 +++++++++++-------- .../kafka/streams/state/SessionStore.java | 10 +++--- .../state/internals/CachingSessionStore.java | 29 +++++++++-------- .../ChangeLoggingSessionBytesStore.java | 12 +++---- .../CompositeReadOnlySessionStore.java | 22 +++++++------ .../state/internals/InMemorySessionStore.java | 26 ++++++++------- .../state/internals/MeteredSessionStore.java | 25 ++++++++------- .../state/internals/RocksDBSessionStore.java | 13 ++++---- .../kafka/test/ReadOnlySessionStoreStub.java | 15 +++++---- 11 files changed, 109 insertions(+), 97 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index 6c0c97599df65..be47e89c765c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -273,8 +273,8 @@ public void put(final Windowed sessionKey, } @Override - public AGG fetchSession(final K key, final long startTime, final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + public AGG fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -283,9 +283,9 @@ public KeyValueIterator, AGG> fetch(final K key) { } @Override - public KeyValueIterator, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); + public KeyValueIterator, AGG> fetch(final K keyFrom, + final K keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index eac606cdbe302..60fa8f436fbb6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -265,9 +265,9 @@ public void put(final Windowed sessionKey, @Override public AGG fetchSession(final K key, - final long startTime, - final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -276,9 +276,9 @@ public KeyValueIterator, AGG> fetch(final K key) { } @Override - public KeyValueIterator, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); + public KeyValueIterator, AGG> fetch(final K keyFrom, + final K keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java index 45eec5486ee24..0ade24286fdd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java @@ -221,13 +221,15 @@ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, /** * Get the value of key from a single session. * - * @param key the key to fetch - * @param startTime start timestamp of the session - * @param endTime end timestamp of the session + * @param key the key to fetch + * @param earliestSessionEndTime start timestamp of the session + * @param latestSessionStartTime end timestamp of the session * @return The value or {@code null} if no session associated with the key can be found * @throws NullPointerException If {@code null} is used for any key. */ - default AGG fetchSession(final K key, final long startTime, final long endTime) { + default AGG fetchSession(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { throw new UnsupportedOperationException( "This API is not supported by this implementation of ReadOnlySessionStore."); } @@ -235,13 +237,15 @@ default AGG fetchSession(final K key, final long startTime, final long endTime) /** * Get the value of key from a single session. * - * @param key the key to fetch - * @param startTime start timestamp of the session - * @param endTime end timestamp of the session + * @param key the key to fetch + * @param earliestSessionEndTime start timestamp of the session + * @param latestSessionStartTime end timestamp of the session * @return The value or {@code null} if no session associated with the key can be found * @throws NullPointerException If {@code null} is used for any key. */ - default AGG fetchSession(final K key, final Instant startTime, final Instant endTime) { + default AGG fetchSession(final K key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime) { throw new UnsupportedOperationException( "This API is not supported by this implementation of ReadOnlySessionStore."); } @@ -284,13 +288,13 @@ default KeyValueIterator, AGG> backwardFetch(final K key) { * For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest * available session to the newest/latest session. * - * @param from first key in the range to find aggregated session values for - * @param to last key in the range to find aggregated session values for + * @param keyFrom first key in the range to find aggregated session values for + * @param keyTo last key in the range to find aggregated session values for * @return KeyValueIterator containing all sessions for the provided key, from oldest to newest * session. * @throws NullPointerException If null is used for any of the keys. */ - KeyValueIterator, AGG> fetch(final K from, final K to); + KeyValueIterator, AGG> fetch(final K keyFrom, final K keyTo); /** * Retrieve all aggregated sessions for the given range of keys. This iterator must be closed @@ -299,13 +303,13 @@ default KeyValueIterator, AGG> backwardFetch(final K key) { * For each key, the iterator guarantees ordering of sessions, starting from the newest/latest * available session to the oldest/earliest session. * - * @param from first key in the range to find aggregated session values for - * @param to last key in the range to find aggregated session values for + * @param keyFrom first key in the range to find aggregated session values for + * @param keyTo last key in the range to find aggregated session values for * @return backward KeyValueIterator containing all sessions for the provided key, from newest * to oldest session. * @throws NullPointerException If null is used for any of the keys. */ - default KeyValueIterator, AGG> backwardFetch(final K from, final K to) { + default KeyValueIterator, AGG> backwardFetch(final K keyFrom, final K keyTo) { throw new UnsupportedOperationException( "This API is not supported by this implementation of ReadOnlySessionStore."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index 5604cef150d2a..926cddc4d2a43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -89,12 +89,12 @@ default KeyValueIterator, AGG> backwardFindSessions(final K keyFrom, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); } - default AGG fetchSession(final K key, final Instant startTime, final Instant endTime) { + default AGG fetchSession(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime) { return fetchSession(key, - ApiUtils.validateMillisecondInstant(startTime, - prepareMillisCheckFailMsgPrefix(startTime, "startTime")), - ApiUtils.validateMillisecondInstant(endTime, - prepareMillisCheckFailMsgPrefix(endTime, "endTime"))); + ApiUtils.validateMillisecondInstant(earliestSessionEndTime, + prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "startTime")), + ApiUtils.validateMillisecondInstant(latestSessionStartTime, + prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "endTime"))); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index d0fe25a6050ca..7e41e7d422acb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -271,17 +271,18 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); if (context.cache() == null) { - return wrapped().fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } else { - final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime); + final Bytes bytesKey = SessionKeySchema.toBinary(key, earliestSessionEndTime, + latestSessionStartTime); final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); final LRUCacheEntry entry = context.cache().get(cacheName, cacheKey); if (entry == null) { - return wrapped().fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } else { return entry.value(); } @@ -301,19 +302,19 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, - final Bytes to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); - return findSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, + final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "from cannot be null"); + Objects.requireNonNull(keyTo, "to cannot be null"); + return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, - final Bytes to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); - return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, + final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "from cannot be null"); + Objects.requireNonNull(keyTo, "to cannot be null"); + return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } public void flush() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index f70eabd19f2f5..baa9846023a39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -91,8 +91,8 @@ public void put(final Windowed sessionKey, final byte[] aggregate) { } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); + public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { + return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -106,12 +106,12 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { - return wrapped().backwardFetch(from, to); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return wrapped().backwardFetch(keyFrom, keyTo); } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { - return wrapped().fetch(from, to); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return wrapped().fetch(keyFrom, keyTo); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index 72233122a6286..df90f29bf52bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -155,12 +155,12 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, } @Override - public V fetchSession(final K key, final long startTime, final long endTime) { + public V fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { Objects.requireNonNull(key, "key can't be null"); final List> stores = storeProvider.stores(storeName, queryableStoreType); for (final ReadOnlySessionStore store : stores) { try { - return store.fetchSession(key, startTime, endTime); + return store.fetchSession(key, earliestSessionEndTime, latestSessionStartTime); } catch (final InvalidStateStoreException ise) { throw new InvalidStateStoreException( "State store [" + storeName + "] is not available anymore" + @@ -220,10 +220,11 @@ public KeyValueIterator, V> backwardFetch(final K key) { } @Override - public KeyValueIterator, V> fetch(final K from, final K to) { - Objects.requireNonNull(from, "from can't be null"); - Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch(from, to); + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch( + keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>( storeProvider.stores(storeName, queryableStoreType).iterator(), @@ -231,10 +232,11 @@ public KeyValueIterator, V> fetch(final K from, final K to) { } @Override - public KeyValueIterator, V> backwardFetch(final K from, final K to) { - Objects.requireNonNull(from, "from can't be null"); - Objects.requireNonNull(to, "to can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch(from, to); + public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { + Objects.requireNonNull(keyFrom, "from can't be null"); + Objects.requireNonNull(keyTo, "to can't be null"); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch( + keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>( storeName, new CompositeKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index b35eaa2204286..50b78d6c05078 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -150,18 +150,19 @@ public void remove(final Windowed sessionKey) { } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { + public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet - if (endTime > observedStreamTime - retentionPeriod) { - final ConcurrentNavigableMap> keyMap = endTimeMap.get(endTime); + if (latestSessionStartTime > observedStreamTime - retentionPeriod) { + final ConcurrentNavigableMap> keyMap = endTimeMap.get( + latestSessionStartTime); if (keyMap != null) { final ConcurrentNavigableMap startTimeMap = keyMap.get(key); if (startTimeMap != null) { - return startTimeMap.get(startTime); + return startTimeMap.get(earliestSessionEndTime); } } } @@ -273,25 +274,26 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(from, "from key cannot be null"); - Objects.requireNonNull(to, "to key cannot be null"); + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); removeExpiredSegments(); - return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); + return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { - Objects.requireNonNull(from, "from key cannot be null"); - Objects.requireNonNull(to, "to key cannot be null"); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + Objects.requireNonNull(keyFrom, "from key cannot be null"); + Objects.requireNonNull(keyTo, "to key cannot be null"); removeExpiredSegments(); - return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); + return registerNewIterator( + keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 7a31b170ea4e6..dff6bacc67c6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -200,12 +200,13 @@ public void remove(final Windowed sessionKey) { } @Override - public V fetchSession(final K key, final long startTime, final long endTime) { + public V fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { Objects.requireNonNull(key, "key cannot be null"); return maybeMeasureLatency( () -> { final Bytes bytesKey = keyBytes(key); - final byte[] result = wrapped().fetchSession(bytesKey, startTime, endTime); + final byte[] result = wrapped().fetchSession(bytesKey, earliestSessionEndTime, + latestSessionStartTime); if (result == null) { return null; } @@ -240,12 +241,12 @@ public KeyValueIterator, V> backwardFetch(final K key) { } @Override - public KeyValueIterator, V> fetch(final K from, - final K to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); + public KeyValueIterator, V> fetch(final K keyFrom, + final K keyTo) { + Objects.requireNonNull(keyFrom, "from cannot be null"); + Objects.requireNonNull(keyTo, "to cannot be null"); return new MeteredWindowedKeyValueIterator<>( - wrapped().fetch(keyBytes(from), keyBytes(to)), + wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, streamsMetrics, serdes, @@ -253,12 +254,12 @@ public KeyValueIterator, V> fetch(final K from, } @Override - public KeyValueIterator, V> backwardFetch(final K from, - final K to) { - Objects.requireNonNull(from, "from cannot be null"); - Objects.requireNonNull(to, "to cannot be null"); + public KeyValueIterator, V> backwardFetch(final K keyFrom, + final K keyTo) { + Objects.requireNonNull(keyFrom, "from cannot be null"); + Objects.requireNonNull(keyTo, "to cannot be null"); return new MeteredWindowedKeyValueIterator<>( - wrapped().backwardFetch(keyBytes(from), keyBytes(to)), + wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, streamsMetrics, serdes, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 338769abea4a5..c7b8a29ab515d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -83,8 +83,9 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte } @Override - public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { - return wrapped().get(SessionKeySchema.toBinary(key, startTime, endTime)); + public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { + return wrapped().get(SessionKeySchema.toBinary(key, earliestSessionEndTime, + latestSessionStartTime)); } @Override @@ -98,13 +99,13 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) } @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to) { - return findSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override - public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Bytes to) { - return backwardFindSessions(from, to, 0, Long.MAX_VALUE); + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index ff37e2586cd94..06640559cb4bb 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -63,7 +63,7 @@ public KeyValueIterator, V> backwardFindSessions(K keyFrom, K keyTo, } @Override - public V fetchSession(K key, long startTime, long endTime) { + public V fetchSession(K key, long earliestSessionEndTime, long latestSessionStartTime) { throw new UnsupportedOperationException("Moved from Session Store. Implement if needed"); } @@ -90,14 +90,15 @@ public KeyValueIterator, V> backwardFetch(K key) { } @Override - public KeyValueIterator, V> fetch(final K from, final K to) { + public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - if (sessions.subMap(from, true, to, true).isEmpty()) { + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { return new KeyValueIteratorStub<>(Collections., V>>emptyIterator()); } - final Iterator, V>>> keysIterator = sessions.subMap(from, true, to, true).values().iterator(); + final Iterator, V>>> keysIterator = sessions.subMap(keyFrom, true, + keyTo, true).values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() { @@ -124,15 +125,15 @@ public KeyValue, V> next() { } @Override - public KeyValueIterator, V> backwardFetch(K from, K to) { + public KeyValueIterator, V> backwardFetch(K keyFrom, K keyTo) { if (!open) { throw new InvalidStateStoreException("not open"); } - if (sessions.subMap(from, true, to, true).isEmpty()) { + if (sessions.subMap(keyFrom, true, keyTo, true).isEmpty()) { return new KeyValueIteratorStub<>(Collections.emptyIterator()); } final Iterator, V>>> keysIterator = - sessions.subMap(from, true, to, true).descendingMap().values().iterator(); + sessions.subMap(keyFrom, true, keyTo, true).descendingMap().values().iterator(); return new KeyValueIteratorStub<>( new Iterator, V>>() { From 23a33fbbdc14ade61cc6abbd0f360a2413126c97 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 6 May 2021 17:54:41 +0100 Subject: [PATCH 4/6] align var comments --- .../streams/state/internals/CachingSessionStore.java | 8 ++++---- .../internals/CompositeReadOnlySessionStore.java | 8 ++++---- .../streams/state/internals/MeteredSessionStore.java | 8 ++++---- .../streams/state/internals/RocksDBSessionStore.java | 11 ++++++++--- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 7e41e7d422acb..1cfb8ce498a64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -304,16 +304,16 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes key) @Override public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(keyFrom, "from cannot be null"); - Objects.requireNonNull(keyTo, "to cannot be null"); + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } @Override public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { - Objects.requireNonNull(keyFrom, "from cannot be null"); - Objects.requireNonNull(keyTo, "to cannot be null"); + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index df90f29bf52bb..d5c1ac7ce74d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -221,8 +221,8 @@ public KeyValueIterator, V> backwardFetch(final K key) { @Override public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "from can't be null"); - Objects.requireNonNull(keyTo, "to can't be null"); + Objects.requireNonNull(keyFrom, "keyFrom can't be null"); + Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch( keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>(storeName, @@ -233,8 +233,8 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { @Override public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "from can't be null"); - Objects.requireNonNull(keyTo, "to can't be null"); + Objects.requireNonNull(keyFrom, "keyFrom can't be null"); + Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch( keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index dff6bacc67c6a..0d8a5c41f0c6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -243,8 +243,8 @@ public KeyValueIterator, V> backwardFetch(final K key) { @Override public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "from cannot be null"); - Objects.requireNonNull(keyTo, "to cannot be null"); + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, @@ -256,8 +256,8 @@ public KeyValueIterator, V> fetch(final K keyFrom, @Override public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { - Objects.requireNonNull(keyFrom, "from cannot be null"); - Objects.requireNonNull(keyTo, "to cannot be null"); + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)), fetchSensor, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index c7b8a29ab515d..f5d710894f767 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -83,9 +83,14 @@ public KeyValueIterator, byte[]> backwardFindSessions(final Byte } @Override - public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { - return wrapped().get(SessionKeySchema.toBinary(key, earliestSessionEndTime, - latestSessionStartTime)); + public byte[] fetchSession(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().get(SessionKeySchema.toBinary( + key, + earliestSessionEndTime, + latestSessionStartTime + )); } @Override From 0c5fdb02f0b40180f1be1e4911c6b28b544b05ab Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 6 May 2021 17:56:55 +0100 Subject: [PATCH 5/6] align format --- .../kafka/streams/state/internals/MeteredSessionStore.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 0d8a5c41f0c6e..1fbc8db095799 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -205,8 +205,11 @@ public V fetchSession(final K key, final long earliestSessionEndTime, final long return maybeMeasureLatency( () -> { final Bytes bytesKey = keyBytes(key); - final byte[] result = wrapped().fetchSession(bytesKey, earliestSessionEndTime, - latestSessionStartTime); + final byte[] result = wrapped().fetchSession( + bytesKey, + earliestSessionEndTime, + latestSessionStartTime + ); if (result == null) { return null; } From 12fd9d6ef0bf3ddbc089250d597ea78bba98398b Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 7 May 2021 10:18:58 +0100 Subject: [PATCH 6/6] nit format --- .../state/internals/CompositeReadOnlySessionStore.java | 8 ++++---- .../streams/state/internals/InMemorySessionStore.java | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java index d5c1ac7ce74d2..fb5fb61ae6108 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java @@ -223,8 +223,8 @@ public KeyValueIterator, V> backwardFetch(final K key) { public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { Objects.requireNonNull(keyFrom, "keyFrom can't be null"); Objects.requireNonNull(keyTo, "keyTo can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.fetch( - keyFrom, keyTo); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = + store -> store.fetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>( storeProvider.stores(storeName, queryableStoreType).iterator(), @@ -235,8 +235,8 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo) { public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo) { Objects.requireNonNull(keyFrom, "keyFrom can't be null"); Objects.requireNonNull(keyTo, "keyTo can't be null"); - final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = store -> store.backwardFetch( - keyFrom, keyTo); + final NextIteratorFunction, V, ReadOnlySessionStore> nextIteratorFunction = + store -> store.backwardFetch(keyFrom, keyTo); return new DelegatingPeekingKeyValueIterator<>( storeName, new CompositeKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 50b78d6c05078..affd47ac14292 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -150,15 +150,16 @@ public void remove(final Windowed sessionKey) { } @Override - public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { + public byte[] fetchSession(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { removeExpiredSegments(); Objects.requireNonNull(key, "key cannot be null"); // Only need to search if the record hasn't expired yet if (latestSessionStartTime > observedStreamTime - retentionPeriod) { - final ConcurrentNavigableMap> keyMap = endTimeMap.get( - latestSessionStartTime); + final ConcurrentNavigableMap> keyMap = endTimeMap.get(latestSessionStartTime); if (keyMap != null) { final ConcurrentNavigableMap startTimeMap = keyMap.get(key); if (startTimeMap != null) {