Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public void process(final K key, final V value) {
TimeWindow mergedWindow = newTimeWindow;
T agg = initializer.apply();

try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessionsToMerge(key, timestamp - windows.inactivityGap(),
timestamp + windows.inactivityGap())) {
try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessions(key, timestamp - windows.inactivityGap(),
timestamp + windows.inactivityGap())) {
while (iterator.hasNext()) {
final KeyValue<Windowed<K>, T> next = iterator.next();
merged.add(next);
Expand Down Expand Up @@ -149,7 +149,7 @@ public void init(final ProcessorContext context) {

@Override
public T get(final Windowed<K> key) {
try (KeyValueIterator<Windowed<K>, T> iter = store.findSessionsToMerge(key.key(), key.window().end(), key.window().end())) {
try (KeyValueIterator<Windowed<K>, T> iter = store.findSessions(key.key(), key.window().end(), key.window().end())) {
if (!iter.hasNext()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
* Fetch any sessions with the matching key and the sessions end is &le earliestEndTime and the sessions
* start is &ge latestStartTime
*/
KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, long earliestSessionEndTime, final long latestSessionStartTime);
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime, final long latestSessionStartTime);

/**
* Remove the session aggregated with provided {@link Windowed} key from the store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateS
this.keySchema = new SessionKeySchema();
}

public KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
validateStoreOpen();
final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(name, key));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name,
Expand Down Expand Up @@ -89,7 +89,7 @@ public void put(final Windowed<K> key, AGG value) {

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return findSessionsToMerge(key, 0, Long.MAX_VALUE);
return findSessions(key, 0, Long.MAX_VALUE);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> {

@SuppressWarnings("unchecked")
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime);
return new SessionStoreIterator(bytesIterator, serdes);
}
Expand Down Expand Up @@ -100,7 +100,7 @@ public boolean isOpen() {

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return findSessionsToMerge(key, 0, Long.MAX_VALUE);
return findSessions(key, 0, Long.MAX_VALUE);
}

private static class SessionStoreIterator<K, AGG> implements KeyValueIterator<Windowed<K>, AGG> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void shouldCreateSingleSessionWhenWithinGap() throws Exception {
context.setTime(500);
processor.process("john", "second");

final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge("john", 0, 2000);
final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions("john", 0, 2000);
assertTrue(values.hasNext());
assertEquals(Long.valueOf(2), values.next().value);
}
Expand All @@ -136,19 +136,19 @@ public void shouldMergeSessions() throws Exception {
context.setTime(0);
final String sessionId = "mel";
processor.process(sessionId, "first");
assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext());
assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());

// move time beyond gap
context.setTime(GAP_MS + 1);
processor.process(sessionId, "second");
assertTrue(sessionStore.findSessionsToMerge(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext());
assertTrue(sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext());
// should still exist as not within gap
assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext());
assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
// move time back
context.setTime(GAP_MS / 2);
processor.process(sessionId, "third");

final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge(sessionId, 0, GAP_MS + 1);
final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions(sessionId, 0, GAP_MS + 1);
final KeyValue<Windowed<String>, Long> kv = iterator.next();

assertEquals(Long.valueOf(3), kv.value);
Expand All @@ -160,7 +160,7 @@ public void shouldUpdateSessionIfTheSameTime() throws Exception {
context.setTime(0);
processor.process("mel", "first");
processor.process("mel", "second");
final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge("mel", 0, 0);
final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("mel", 0, 0);
assertEquals(Long.valueOf(2L), iterator.next().value);
assertFalse(iterator.hasNext());
}
Expand Down Expand Up @@ -196,14 +196,14 @@ public void shouldRemoveMergedSessionsFromStateStore() throws Exception {
processor.process("a", "1");

// first ensure it is in the store
final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessionsToMerge("a", 0, 0);
final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessions("a", 0, 0);
assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a1.next());

context.setTime(100);
processor.process("a", "2");
// a1 from above should have been removed
// should have merged session in store
final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessionsToMerge("a", 0, 100);
final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessions("a", 0, 100);
assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 100)), 2L), a2.next());
assertFalse(a2.hasNext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public void shouldPutFetchFromCache() throws Exception {
cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L);
cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L);

final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessionsToMerge("a", 0, 0);
final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessionsToMerge("b", 0, 0);
final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a", 0, 0);
final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b", 0, 0);

assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a.next());
assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 1L), b.next());
Expand Down Expand Up @@ -115,7 +115,7 @@ public void shouldFlushItemsToStoreOnEviction() throws Exception {
@Test
public void shouldQueryItemsInCacheAndStore() throws Exception {
final List<KeyValue<Windowed<String>, Long>> added = addSessionsUntilOverflow("a");
final KeyValueIterator<Windowed<String>, Long> iterator = cachingStore.findSessionsToMerge("a", 0, added.size() * 10);
final KeyValueIterator<Windowed<String>, Long> iterator = cachingStore.findSessions("a", 0, added.size() * 10);
final List<KeyValue<Windowed<String>, Long>> actual = toList(iterator);
assertEquals(added, actual);
}
Expand All @@ -129,7 +129,7 @@ public void shouldRemove() throws Exception {
cachingStore.flush();
cachingStore.remove(a);
cachingStore.flush();
final KeyValueIterator<Windowed<String>, Long> rangeIter = cachingStore.findSessionsToMerge("a", 0, 0);
final KeyValueIterator<Windowed<String>, Long> rangeIter = cachingStore.findSessions("a", 0, 0);
assertFalse(rangeIter.hasNext());
}

Expand All @@ -142,7 +142,7 @@ public void shouldFetchCorrectlyAcrossSegments() throws Exception {
cachingStore.put(a2, 2L);
cachingStore.put(a3, 3L);
cachingStore.flush();
final KeyValueIterator<Windowed<String>, Long> results = cachingStore.findSessionsToMerge("a", 0, Segments.MIN_SEGMENT_INTERVAL * 2);
final KeyValueIterator<Windowed<String>, Long> results = cachingStore.findSessions("a", 0, Segments.MIN_SEGMENT_INTERVAL * 2);
assertEquals(a1, results.next().key);
assertEquals(a2, results.next().key);
assertEquals(a3, results.next().key);
Expand All @@ -167,7 +167,7 @@ public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws Exception {
cachingStore.close();
cachingStore.findSessionsToMerge("a", 0, Long.MAX_VALUE);
cachingStore.findSessions("a", 0, Long.MAX_VALUE);
}

@Test(expected = InvalidStateStoreException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void shouldPutAndFindSessionsInRange() throws Exception {
final List<KeyValue<Windowed<String>, Long>> expected
= Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));

final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge(key, 0, 1000L);
final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L);
assertEquals(expected, toList(values));
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public void shouldFindValuesWithinMergingSessionWindowRange() throws Exception {
final String key = "a";
sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L);
sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L);
final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge(key, -1, 1000L);
final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions(key, -1, 1000L);

final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L),
Expand All @@ -121,9 +121,9 @@ public void shouldRemove() throws Exception {
sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L);

sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000)));
assertFalse(sessionStore.findSessionsToMerge("a", 0, 1000L).hasNext());
assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext());

assertTrue(sessionStore.findSessionsToMerge("a", 1500, 2500).hasNext());
assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext());
}

@Test
Expand All @@ -138,7 +138,7 @@ public void shouldFindSessionsToMerge() throws Exception {
sessionStore.put(session3, 3L);
sessionStore.put(session4, 4L);
sessionStore.put(session5, 5L);
final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge("a", 150, 300);
final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions("a", 150, 300);
assertEquals(session2, results.next().key);
assertEquals(session3, results.next().key);
assertFalse(results.hasNext());
Expand Down