Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ public void put(final Windowed<K> sessionKey,
}

@Override
public AGG fetchSession(final K key, final long startTime, final long endTime) {
return wrapped().fetchSession(key, startTime, endTime);
public AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime) {
return wrapped().fetchSession(key, sessionStartTime, sessionEndTime);
}

@Override
Expand All @@ -256,9 +256,9 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
}

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
final K to) {
return wrapped().fetch(from, to);
public KeyValueIterator<Windowed<K>, AGG> fetch(final K keyFrom,
final K keyTo) {
return wrapped().fetch(keyFrom, keyTo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ public void put(final Windowed<K> sessionKey,

@Override
public AGG fetchSession(final K key,
final long startTime,
final long endTime) {
return wrapped().fetchSession(key, startTime, endTime);
final long sessionStartTime,
final long sessionEndTime) {
return wrapped().fetchSession(key, sessionStartTime, sessionEndTime);
}

@Override
Expand All @@ -251,9 +251,9 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
}

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
final K to) {
return wrapped().fetch(from, to);
public KeyValueIterator<Windowed<K>, AGG> fetch(final K keyFrom,
final K keyTo) {
return wrapped().fetch(keyFrom, keyTo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,86 @@

import org.apache.kafka.streams.kstream.Windowed;

import java.time.Instant;

/**
* A session store that only supports read operations.
* Implementations should be thread-safe as concurrent reads and writes
* are expected.
*
* @param <K> the key type
* @param <K> the key type
* @param <AGG> the aggregated value type
*/
public interface ReadOnlySessionStore<K, AGG> {
/**
* Retrieve all aggregated sessions for the provided key.
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
* <p>
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching key and aggregated values
* @throws NullPointerException if null is used for key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime);

/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
* <p>
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching keys and aggregated values
* @throws NullPointerException if null is used for any key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime);

/**
* Get the value of key from a single session.
*
* @param key the key to fetch
* @param sessionStartTime start timestamp of the session
* @param sessionEndTime end timestamp of the session
* @return The value or {@code null} if no session associated with the key can be found
* @throws NullPointerException if {@code null} is used for any key.
*/
AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime);

/**
* Retrieve all aggregated sessions for the provided key.
* This iterator must be closed after use.
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
* available session to the newest/latest session.
*
* @param key record key to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @throws NullPointerException If null is used for key.
*
* @param key record key to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @throws NullPointerException if null is used for key.
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K key);

/**
* Retrieve all aggregated sessions for the given range of keys.
* This iterator must be closed after use.
*
* <p>
* For each key, the iterator guarantees ordering of sessions, starting from the oldest/earliest
* available session to the newest/latest session.
*
* @param from first key in the range to find aggregated session values for
* @param to last key in the range to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @throws NullPointerException If null is used for any of the keys.
* @param keyFrom first key in the range to find aggregated session values for
* @param keyTo last key in the range to find aggregated session values for
* @return KeyValueIterator containing all sessions for the provided key.
* @throws NullPointerException if null is used for any of the keys.
*/
KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to);
KeyValueIterator<Windowed<K>, AGG> fetch(final K keyFrom, final K keyTo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;

import java.time.Instant;

import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;

/**
* Interface for storing the aggregated values of sessions.
* <p>
Expand All @@ -29,6 +34,9 @@
* If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store
* while the two old sessions must be deleted.
*
* Long-based time-ranges are used instead of Instant-based exposed in Read-only interface to avoid performance
* penalties caused by object allocation when using Stores via Processor API.
*
* @param <K> type of the record keys
* @param <AGG> type of the aggregated values
*/
Expand All @@ -37,56 +45,93 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
/**
* Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
*
* <p>
* This iterator must be closed after use.
*
* @param key the key to return sessions for
* @param key the key to return sessions for
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching key and aggregated values
* @throws NullPointerException If null is used for key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime);
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime);

@Override
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return findSessions(
key,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}

/**
* Fetch any sessions in the given range of keys and the sessions end is &ge; earliestSessionEndTime and the sessions
* start is &le; latestSessionStartTime
*
* <p>
* This iterator must be closed after use.
*
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param keyFrom The first key that could be in the range
* @param keyTo The last key that could be in the range
* @param earliestSessionEndTime the end timestamp of the earliest session to search for
* @param latestSessionStartTime the end timestamp of the latest session to search for
* @return iterator of sessions with the matching keys and aggregated values
* @throws NullPointerException If null is used for any key.
*/
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime);
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime);

@Override
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return findSessions(
keyFrom,
keyTo,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}

/**
* Get the value of key from a single session.
*
* @param key the key to fetch
* @param startTime start timestamp of the session
* @param endTime end timestamp of the session
* @param key the key to fetch
* @param sessionStartTime start timestamp of the session
* @param sessionEndTime end timestamp of the session
* @return The value or {@code null} if no session associated with the key can be found
* @throws NullPointerException If {@code null} is used for any key.
* @throws NullPointerException if {@code null} is used for any key.
*/
AGG fetchSession(final K key, final long startTime, final long endTime);
AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime);

@Override
default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
return fetchSession(
key,
ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
}

/**
* Remove the session aggregated with provided {@link Windowed} key from the store
*
* @param sessionKey key of the session to remove
* @throws NullPointerException If null is used for sessionKey.
* @throws NullPointerException if null is used for sessionKey.
*/
void remove(final Windowed<K> sessionKey);

/**
* Write the aggregated value for the provided key to the store
*
* @param sessionKey key of the session to write
* @param aggregate the aggregated value for the session, it can be null;
* if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for sessionKey.
* @throws NullPointerException if null is used for sessionKey.
*/
void put(final Windowed<K> sessionKey, final AGG aggregate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,17 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFro
}

@Override
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
public byte[] fetchSession(final Bytes key, final long sessionStartTime, final long sessionEndTime) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
if (context.cache() == null) {
return wrapped().fetchSession(key, startTime, endTime);
return wrapped().fetchSession(key, sessionStartTime, sessionEndTime);
} else {
final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime);
final Bytes bytesKey = SessionKeySchema.toBinary(key, sessionStartTime, sessionEndTime);
final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
final LRUCacheEntry entry = context.cache().get(cacheName, cacheKey);
if (entry == null) {
return wrapped().fetchSession(key, startTime, endTime);
return wrapped().fetchSession(key, sessionStartTime, sessionEndTime);
} else {
return entry.value();
}
Expand All @@ -224,11 +224,11 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");
return findSessions(from, to, 0, Long.MAX_VALUE);
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo) {
Objects.requireNonNull(keyFrom, "from cannot be null");
Objects.requireNonNull(keyTo, "to cannot be null");
return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE);
}

public void flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
}

@Override
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
return wrapped().fetchSession(key, startTime, endTime);
public byte[] fetchSession(final Bytes key, final long sessionStartTime, final long sessionEndTime) {
return wrapped().fetchSession(key, sessionStartTime, sessionEndTime);
}

@Override
Expand All @@ -77,7 +77,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) {
return wrapped().fetch(from, to);
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) {
return wrapped().fetch(keyFrom, keyTo);
}
}
Loading