-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-10648: Add Prefix Scan support to State Stores #9508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
17be91a
4a41206
33be911
25980a0
a2ea513
dddad17
d2479a4
8eca3c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| */ | ||
| package org.apache.kafka.streams.state.internals; | ||
|
|
||
| import org.apache.kafka.common.serialization.Serializer; | ||
| import org.apache.kafka.common.utils.Bytes; | ||
| import org.apache.kafka.streams.KeyValue; | ||
| import org.apache.kafka.streams.processor.ProcessorContext; | ||
|
|
@@ -291,6 +292,16 @@ public KeyValueIterator<Bytes, byte[]> all() { | |
| return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); | ||
| } | ||
|
|
||
| @Override | ||
| public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method needs unit testing. Try to use a mock for the cache in the test.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment as below. Unit tests are in CachingInMemoryKeyValueStoreTest which already extends AbstractKeyValueStoreTest and creates an in memory cache store.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see. I missed those. Sorry! That is fine then, although I think unit tests with mocks would be better.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I had created another ticket to streamline tests for CachingKVStore: https://issues.apache.org/jira/browse/KAFKA-10788. @rohitrmd had volunteered to take this up. |
||
| validateStoreOpen(); | ||
| final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().prefixScan(prefix, prefixKeySerializer); | ||
| final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); | ||
| final Bytes to = Bytes.increment(from); | ||
| final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to, false); | ||
| return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); | ||
| } | ||
|
|
||
| @Override | ||
| public KeyValueIterator<Bytes, byte[]> reverseAll() { | ||
| validateStoreOpen(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| */ | ||
| package org.apache.kafka.streams.state.internals; | ||
|
|
||
| import org.apache.kafka.common.serialization.Serializer; | ||
| import org.apache.kafka.common.utils.Bytes; | ||
| import org.apache.kafka.streams.KeyValue; | ||
| import org.apache.kafka.streams.processor.ProcessorContext; | ||
|
|
@@ -31,6 +32,7 @@ | |
| import java.util.Set; | ||
| import java.util.TreeMap; | ||
| import java.util.TreeSet; | ||
| import java.util.Objects; | ||
|
|
||
| public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> { | ||
|
|
||
|
|
@@ -103,6 +105,20 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { | ||
|
vamossagar12 marked this conversation as resolved.
Outdated
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vamossagar12 I can still not find the unit test for this method.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this, do you want me to add the test cases here?https://github.com/apache/kafka/blob/17be91a37214bf77430c65d9300a5120e4348df9/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java There are tests in CachingInMemoryKeyValueStoreTest, which is where the tests for other methods like range etc have been added.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think those tests never call the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the new ticket: https://issues.apache.org/jira/browse/KAFKA-12289 and the PR for the ticket: |
||
| Objects.requireNonNull(prefix, "prefix cannot be null"); | ||
| Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null"); | ||
|
|
||
| final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); | ||
| final Bytes to = Bytes.increment(from); | ||
|
|
||
| return new DelegatingPeekingKeyValueIterator<>( | ||
| name, | ||
| new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true) | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized byte[] delete(final Bytes key) { | ||
| final byte[] oldValue = map.remove(key); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.