diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 1af1f48ef47af..bb86f52b20923 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -92,8 +92,8 @@ public void process(final K key, final V value) { TimeWindow mergedWindow = newTimeWindow; T agg = initializer.apply(); - try (final KeyValueIterator, T> iterator = store.findSessionsToMerge(key, timestamp - windows.inactivityGap(), - timestamp + windows.inactivityGap())) { + try (final KeyValueIterator, T> iterator = store.findSessions(key, timestamp - windows.inactivityGap(), + timestamp + windows.inactivityGap())) { while (iterator.hasNext()) { final KeyValue, T> next = iterator.next(); merged.add(next); @@ -149,7 +149,7 @@ public void init(final ProcessorContext context) { @Override public T get(final Windowed key) { - try (KeyValueIterator, T> iter = store.findSessionsToMerge(key.key(), key.window().end(), key.window().end())) { + try (KeyValueIterator, T> iter = store.findSessions(key.key(), key.window().end(), key.window().end())) { if (!iter.hasNext()) { return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java index 39658a32a363f..bb82f6dea2d54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java @@ -30,7 +30,7 @@ public interface SessionStore extends StateStore, ReadOnlySessionStore, AGG> findSessionsToMerge(final K key, long earliestSessionEndTime, final long latestSessionStartTime); + KeyValueIterator, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime); /** * Remove the session aggregated with provided {@link Windowed} key from the store diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index a012c6398cb4d..17c4ee0084684 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -56,9 +56,9 @@ class CachingSessionStore implements SessionStore, CachedStateS this.keySchema = new SessionKeySchema(); } - public KeyValueIterator, AGG> findSessionsToMerge(final K key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { + public KeyValueIterator, AGG> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { validateStoreOpen(); final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(name, key)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, @@ -89,7 +89,7 @@ public void put(final Windowed key, AGG value) { @Override public KeyValueIterator, AGG> fetch(final K key) { - return findSessionsToMerge(key, 0, Long.MAX_VALUE); + return findSessions(key, 0, Long.MAX_VALUE); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 73c825cf50479..a8ddc7376f172 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -47,7 +47,7 @@ class RocksDBSessionStore implements SessionStore { @SuppressWarnings("unchecked") @Override - public KeyValueIterator, AGG> findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { + public KeyValueIterator, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { final KeyValueIterator bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime); return new SessionStoreIterator(bytesIterator, serdes); } @@ -100,7 +100,7 @@ public boolean isOpen() { @Override public KeyValueIterator, AGG> fetch(final K key) { - return findSessionsToMerge(key, 0, Long.MAX_VALUE); + return findSessions(key, 0, Long.MAX_VALUE); } private static class SessionStoreIterator implements KeyValueIterator, AGG> { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index c107c3eafaf51..ba955220e9e0b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -125,7 +125,7 @@ public void shouldCreateSingleSessionWhenWithinGap() throws Exception { context.setTime(500); processor.process("john", "second"); - final KeyValueIterator, Long> values = sessionStore.findSessionsToMerge("john", 0, 2000); + final KeyValueIterator, Long> values = sessionStore.findSessions("john", 0, 2000); assertTrue(values.hasNext()); assertEquals(Long.valueOf(2), values.next().value); } @@ -136,19 +136,19 @@ public void shouldMergeSessions() throws Exception { context.setTime(0); final String sessionId = "mel"; processor.process(sessionId, "first"); - assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext()); + assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext()); // move time beyond gap context.setTime(GAP_MS + 1); processor.process(sessionId, "second"); - assertTrue(sessionStore.findSessionsToMerge(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext()); + assertTrue(sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext()); // should still exist as not within gap - assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext()); + assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext()); // move time back context.setTime(GAP_MS / 2); processor.process(sessionId, "third"); - final KeyValueIterator, Long> iterator = sessionStore.findSessionsToMerge(sessionId, 0, GAP_MS + 1); + final KeyValueIterator, Long> iterator = sessionStore.findSessions(sessionId, 0, GAP_MS + 1); final KeyValue, Long> kv = iterator.next(); assertEquals(Long.valueOf(3), kv.value); @@ -160,7 +160,7 @@ public void shouldUpdateSessionIfTheSameTime() throws Exception { context.setTime(0); processor.process("mel", "first"); processor.process("mel", "second"); - final KeyValueIterator, Long> iterator = sessionStore.findSessionsToMerge("mel", 0, 0); + final KeyValueIterator, Long> iterator = sessionStore.findSessions("mel", 0, 0); assertEquals(Long.valueOf(2L), iterator.next().value); assertFalse(iterator.hasNext()); } @@ -196,14 +196,14 @@ public void shouldRemoveMergedSessionsFromStateStore() throws Exception { processor.process("a", "1"); // first ensure it is in the store - final KeyValueIterator, Long> a1 = sessionStore.findSessionsToMerge("a", 0, 0); + final KeyValueIterator, Long> a1 = sessionStore.findSessions("a", 0, 0); assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a1.next()); context.setTime(100); processor.process("a", "2"); // a1 from above should have been removed // should have merged session in store - final KeyValueIterator, Long> a2 = sessionStore.findSessionsToMerge("a", 0, 100); + final KeyValueIterator, Long> a2 = sessionStore.findSessions("a", 0, 100); assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 100)), 2L), a2.next()); assertFalse(a2.hasNext()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index cb6f87e8e75bc..d4533164678fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -72,8 +72,8 @@ public void shouldPutFetchFromCache() throws Exception { cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L); cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L); - final KeyValueIterator, Long> a = cachingStore.findSessionsToMerge("a", 0, 0); - final KeyValueIterator, Long> b = cachingStore.findSessionsToMerge("b", 0, 0); + final KeyValueIterator, Long> a = cachingStore.findSessions("a", 0, 0); + final KeyValueIterator, Long> b = cachingStore.findSessions("b", 0, 0); assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a.next()); assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 1L), b.next()); @@ -115,7 +115,7 @@ public void shouldFlushItemsToStoreOnEviction() throws Exception { @Test public void shouldQueryItemsInCacheAndStore() throws Exception { final List, Long>> added = addSessionsUntilOverflow("a"); - final KeyValueIterator, Long> iterator = cachingStore.findSessionsToMerge("a", 0, added.size() * 10); + final KeyValueIterator, Long> iterator = cachingStore.findSessions("a", 0, added.size() * 10); final List, Long>> actual = toList(iterator); assertEquals(added, actual); } @@ -129,7 +129,7 @@ public void shouldRemove() throws Exception { cachingStore.flush(); cachingStore.remove(a); cachingStore.flush(); - final KeyValueIterator, Long> rangeIter = cachingStore.findSessionsToMerge("a", 0, 0); + final KeyValueIterator, Long> rangeIter = cachingStore.findSessions("a", 0, 0); assertFalse(rangeIter.hasNext()); } @@ -142,7 +142,7 @@ public void shouldFetchCorrectlyAcrossSegments() throws Exception { cachingStore.put(a2, 2L); cachingStore.put(a3, 3L); cachingStore.flush(); - final KeyValueIterator, Long> results = cachingStore.findSessionsToMerge("a", 0, Segments.MIN_SEGMENT_INTERVAL * 2); + final KeyValueIterator, Long> results = cachingStore.findSessions("a", 0, Segments.MIN_SEGMENT_INTERVAL * 2); assertEquals(a1, results.next().key); assertEquals(a2, results.next().key); assertEquals(a3, results.next().key); @@ -167,7 +167,7 @@ public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception @Test(expected = InvalidStateStoreException.class) public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws Exception { cachingStore.close(); - cachingStore.findSessionsToMerge("a", 0, Long.MAX_VALUE); + cachingStore.findSessions("a", 0, Long.MAX_VALUE); } @Test(expected = InvalidStateStoreException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 11766c75db863..a664e3b8bbca7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -78,7 +78,7 @@ public void shouldPutAndFindSessionsInRange() throws Exception { final List, Long>> expected = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); - final KeyValueIterator, Long> values = sessionStore.findSessionsToMerge(key, 0, 1000L); + final KeyValueIterator, Long> values = sessionStore.findSessions(key, 0, 1000L); assertEquals(expected, toList(values)); } @@ -107,7 +107,7 @@ public void shouldFindValuesWithinMergingSessionWindowRange() throws Exception { final String key = "a"; sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L); sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L); - final KeyValueIterator, Long> results = sessionStore.findSessionsToMerge(key, -1, 1000L); + final KeyValueIterator, Long> results = sessionStore.findSessions(key, -1, 1000L); final List, Long>> expected = Arrays.asList( KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L), @@ -121,9 +121,9 @@ public void shouldRemove() throws Exception { sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L); sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000))); - assertFalse(sessionStore.findSessionsToMerge("a", 0, 1000L).hasNext()); + assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext()); - assertTrue(sessionStore.findSessionsToMerge("a", 1500, 2500).hasNext()); + assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext()); } @Test @@ -138,7 +138,7 @@ public void shouldFindSessionsToMerge() throws Exception { sessionStore.put(session3, 3L); sessionStore.put(session4, 4L); sessionStore.put(session5, 5L); - final KeyValueIterator, Long> results = sessionStore.findSessionsToMerge("a", 150, 300); + final KeyValueIterator, Long> results = sessionStore.findSessions("a", 150, 300); assertEquals(session2, results.next().key); assertEquals(session3, results.next().key); assertFalse(results.hasNext());