Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ public void put(final Windowed<K> sessionKey,
}

@Override
public AGG fetchSession(final K key, final long startTime, final long endTime) {
return wrapped().fetchSession(key, startTime, endTime);
public AGG fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
}

@Override
Expand All @@ -283,9 +283,9 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
}

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
final K to) {
return wrapped().fetch(from, to);
public KeyValueIterator<Windowed<K>, AGG> fetch(final K keyFrom,
final K keyTo) {
return wrapped().fetch(keyFrom, keyTo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ public void put(final Windowed<K> sessionKey,

@Override
public AGG fetchSession(final K key,
final long startTime,
final long endTime) {
return wrapped().fetchSession(key, startTime, endTime);
final long earliestSessionEndTime,
final long latestSessionStartTime) {
return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
}

@Override
Expand All @@ -276,9 +276,9 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
}

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
final K to) {
return wrapped().fetch(from, to);
public KeyValueIterator<Windowed<K>, AGG> fetch(final K keyFrom,
final K keyTo) {
return wrapped().fetch(keyFrom, keyTo);
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,101 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;

import java.time.Instant;

import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;

/**
* Interface for storing the aggregated values of sessions.
* <p>
* The key is internally represented as {@link Windowed Windowed&lt;K&gt;} that comprises the plain key
* and the {@link Window} that represents window start- and end-timestamp.
* The key is internally represented as {@link Windowed Windowed&lt;K&gt;} that comprises the plain
* key and the {@link Window} that represents window start- and end-timestamp.
* <p>
* If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store
* while the two old sessions must be deleted.
* If two sessions are merged, a new session with new start- and end-timestamp must be inserted into
* the store while the two old sessions must be deleted.
*
* @param <K> type of the record keys
* @param <AGG> type of the aggregated values
*/
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {

@Override
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return findSessions(
key,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime,
prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}

@Override
default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return backwardFindSessions(
key,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime,
prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}

default KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return findSessions(
keyFrom,
keyTo,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime,
prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}

default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom,
final K keyTo,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return backwardFindSessions(
keyFrom,
keyTo,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime,
prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}

default AGG fetchSession(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime) {
return fetchSession(key,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "startTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime,
prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "endTime")));
}

/**
* Remove the session aggregated with provided {@link Windowed} key from the store
*
* @param sessionKey key of the session to remove
* @throws NullPointerException If null is used for sessionKey.
*/
void remove(final Windowed<K> sessionKey);

/**
* Write the aggregated value for the provided key to the store
*
* @param sessionKey key of the session to write
* @param aggregate the aggregated value for the session, it can be null;
* if the serialized bytes are also null it is interpreted as deletes
* @param aggregate the aggregated value for the session, it can be null; if the serialized
* bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for sessionKey.
*/
void put(final Windowed<K> sessionKey, final AGG aggregate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,18 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Byte
}

@Override
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
if (context.cache() == null) {
return wrapped().fetchSession(key, startTime, endTime);
return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
} else {
final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime);
final Bytes bytesKey = SessionKeySchema.toBinary(key, earliestSessionEndTime,
latestSessionStartTime);
final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
final LRUCacheEntry entry = context.cache().get(cacheName, cacheKey);
if (entry == null) {
return wrapped().fetchSession(key, startTime, endTime);
return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
} else {
return entry.value();
}
Expand All @@ -301,19 +302,19 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key)
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");
return findSessions(from, to, 0, Long.MAX_VALUE);
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo) {
Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
Objects.requireNonNull(keyTo, "keyTo cannot be null");
return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from,
final Bytes to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");
return backwardFindSessions(from, to, 0, Long.MAX_VALUE);
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom,
final Bytes keyTo) {
Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
Objects.requireNonNull(keyTo, "keyTo cannot be null");
return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE);
}

public void flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
}

@Override
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
return wrapped().fetchSession(key, startTime, endTime);
public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
}

@Override
Expand All @@ -106,12 +106,12 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from, final Bytes to) {
return wrapped().backwardFetch(from, to);
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) {
return wrapped().backwardFetch(keyFrom, keyTo);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) {
return wrapped().fetch(from, to);
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) {
return wrapped().fetch(keyFrom, keyTo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom,
}

@Override
public V fetchSession(final K key, final long startTime, final long endTime) {
public V fetchSession(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
Objects.requireNonNull(key, "key can't be null");
final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType);
for (final ReadOnlySessionStore<K, V> store : stores) {
try {
return store.fetchSession(key, startTime, endTime);
return store.fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
} catch (final InvalidStateStoreException ise) {
throw new InvalidStateStoreException(
"State store [" + storeName + "] is not available anymore" +
Expand Down Expand Up @@ -220,21 +220,23 @@ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
}

@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
Objects.requireNonNull(from, "from can't be null");
Objects.requireNonNull(to, "to can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction = store -> store.fetch(from, to);
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K keyTo) {
Objects.requireNonNull(keyFrom, "keyFrom can't be null");
Objects.requireNonNull(keyTo, "keyTo can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction =
store -> store.fetch(keyFrom, keyTo);
return new DelegatingPeekingKeyValueIterator<>(storeName,
new CompositeKeyValueIterator<>(
storeProvider.stores(storeName, queryableStoreType).iterator(),
nextIteratorFunction));
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K from, final K to) {
Objects.requireNonNull(from, "from can't be null");
Objects.requireNonNull(to, "to can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction = store -> store.backwardFetch(from, to);
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom, final K keyTo) {
Objects.requireNonNull(keyFrom, "keyFrom can't be null");
Objects.requireNonNull(keyTo, "keyTo can't be null");
final NextIteratorFunction<Windowed<K>, V, ReadOnlySessionStore<K, V>> nextIteratorFunction =
store -> store.backwardFetch(keyFrom, keyTo);
return new DelegatingPeekingKeyValueIterator<>(
storeName,
new CompositeKeyValueIterator<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,20 @@ public void remove(final Windowed<Bytes> sessionKey) {
}

@Override
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
public byte[] fetchSession(final Bytes key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
removeExpiredSegments();

Objects.requireNonNull(key, "key cannot be null");

// Only need to search if the record hasn't expired yet
if (endTime > observedStreamTime - retentionPeriod) {
final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(endTime);
if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
if (keyMap != null) {
final ConcurrentNavigableMap<Long, byte[]> startTimeMap = keyMap.get(key);
if (startTimeMap != null) {
return startTimeMap.get(startTime);
return startTimeMap.get(earliestSessionEndTime);
}
}
}
Expand Down Expand Up @@ -273,25 +275,26 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key)
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) {
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) {

Objects.requireNonNull(from, "from key cannot be null");
Objects.requireNonNull(to, "to key cannot be null");
Objects.requireNonNull(keyFrom, "from key cannot be null");
Objects.requireNonNull(keyTo, "to key cannot be null");

removeExpiredSegments();


return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false);
return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from, final Bytes to) {
Objects.requireNonNull(from, "from key cannot be null");
Objects.requireNonNull(to, "to key cannot be null");
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) {
Objects.requireNonNull(keyFrom, "from key cannot be null");
Objects.requireNonNull(keyTo, "to key cannot be null");

removeExpiredSegments();

return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true);
return registerNewIterator(
keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true);
}

@Override
Expand Down
Loading