diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index d8b19fd20b373..46a9d45a0fc26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -104,7 +104,7 @@ public String name() { @Override public KeyValueStore get() { - return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray()); + return new InMemoryKeyValueStore(name); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java index 673a7c9ba2780..20a434a9a2c4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java @@ -25,7 +25,7 @@ /** * Optimized {@link KeyValueIterator} used when the same element could be peeked multiple times. */ -class DelegatingPeekingKeyValueIterator implements KeyValueIterator, PeekingKeyValueIterator { +public class DelegatingPeekingKeyValueIterator implements KeyValueIterator, PeekingKeyValueIterator { private final KeyValueIterator underlying; private final String storeName; private KeyValue next; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index d6dd42ac264ff..cc28d6487bb39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -16,39 +16,27 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; +import java.util.List; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; -public class InMemoryKeyValueStore implements KeyValueStore { +public class InMemoryKeyValueStore implements KeyValueStore { private final String name; - private final Serde keySerde; - private final Serde valueSerde; - private final NavigableMap map; + private final NavigableMap map; private volatile boolean open = false; - private StateSerdes serdes; - - public InMemoryKeyValueStore(final String name, - final Serde keySerde, - final Serde valueSerde) { + public InMemoryKeyValueStore(final String name) { this.name = name; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - // TODO: when we have serde associated with class types, we can - // improve this situation by passing the comparator here. this.map = new TreeMap<>(); } @@ -61,20 +49,15 @@ public String name() { @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { - // construct the serde - this.serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); if (root != null) { // register the store context.register(root, (key, value) -> { // this is a delete if (value == null) { - delete(serdes.keyFrom(key)); + delete(Bytes.wrap(key)); } else { - put(serdes.keyFrom(key), serdes.valueFrom(value)); + put(Bytes.wrap(key), value); } }); } @@ -93,13 +76,12 @@ public boolean isOpen() { } @Override - public synchronized V get(final K key) { + public synchronized byte[] get(final Bytes key) { return this.map.get(key); } @Override - public synchronized void put(final K key, - final V value) { + public synchronized void put(final Bytes key, final byte[] value) { if (value == null) { this.map.remove(key); } else { @@ -108,9 +90,8 @@ public synchronized void put(final K key, } @Override - public synchronized V putIfAbsent(final K key, - final V value) { - final V originalValue = get(key); + public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { + final byte[] originalValue = get(key); if (originalValue == null) { put(key, value); } @@ -118,29 +99,29 @@ public synchronized V putIfAbsent(final K key, } @Override - public synchronized void putAll(final List> entries) { - for (final KeyValue entry : entries) { + public synchronized void putAll(final List> entries) { + for (final KeyValue entry : entries) { put(entry.key, entry.value); } } @Override - public synchronized V delete(final K key) { + public synchronized byte[] delete(final Bytes key) { return this.map.remove(key); } @Override - public synchronized KeyValueIterator range(final K from, - final K to) { + public synchronized KeyValueIterator range(final Bytes from, + final Bytes to) { return new DelegatingPeekingKeyValueIterator<>( name, - new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator())); } @Override - public synchronized KeyValueIterator all() { - final TreeMap copy = new TreeMap<>(this.map); - return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator())); + public synchronized KeyValueIterator all() { + final TreeMap copy = new TreeMap<>(this.map); + return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator(copy.entrySet().iterator())); } @Override @@ -159,10 +140,10 @@ public void close() { this.open = false; } - private static class InMemoryKeyValueIterator implements KeyValueIterator { - private final Iterator> iter; + private static class InMemoryKeyValueIterator implements KeyValueIterator { + private final Iterator> iter; - private InMemoryKeyValueIterator(final Iterator> iter) { + private InMemoryKeyValueIterator(final Iterator> iter) { this.iter = iter; } @@ -172,8 +153,8 @@ public boolean hasNext() { } @Override - public KeyValue next() { - final Map.Entry entry = iter.next(); + public KeyValue next() { + final Map.Entry entry = iter.next(); return new KeyValue<>(entry.getKey(), entry.getValue()); } @@ -188,7 +169,7 @@ public void close() { } @Override - public K peekNextKey() { + public Bytes peekNextKey() { throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java index 05b74dca9f45b..afb2cc12d94f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java @@ -19,7 +19,8 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.junit.Test; @@ -44,9 +45,7 @@ public void shouldAddAndSubtract() { this::differenceNotNullArgs ).get(); - - final InMemoryKeyValueStore> myStore = - new InMemoryKeyValueStore<>("myStore", null, null); + final KeyValueStore> myStore = new GenericInMemoryKeyValueStore<>("myStore"); context.register(myStore, null); reduceProcessor.init(context); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java index bb1dec2587112..30080c3f03cbd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.InternalNameProvider; @@ -107,7 +106,7 @@ public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() { @Test public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() { final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class); - final InMemoryKeyValueStore store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray()); + final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name"); EasyMock.expect(supplier.name()).andReturn("name").anyTimes(); EasyMock.expect(supplier.get()).andReturn(store); EasyMock.replay(supplier); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index b1a64014bbec7..6c2b7cf4f2b1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -58,7 +58,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { private final int maxCacheSizeBytes = 150; private InternalMockProcessorContext context; private CachingKeyValueStore store; - private InMemoryKeyValueStore underlyingStore; + private InMemoryKeyValueStore underlyingStore; private ThreadCache cache; private CacheFlushListenerStub cacheFlushListener; private String topic; @@ -66,7 +66,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @Before public void setUp() { final String storeName = "store"; - underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray()); + underlyingStore = new InMemoryKeyValueStore(storeName); cacheFlushListener = new CacheFlushListenerStub<>(); store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); store.setFlushListener(cacheFlushListener, false); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 5fdfd46ccd3f0..5645b8bc84a5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -43,7 +43,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { private InternalMockProcessorContext context; - private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray()); + private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv"); private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner); private final Map sent = new HashMap<>(); private final Bytes hi = Bytes.wrap("hi".getBytes()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java index 8b6fc9574cefe..593b265aa02c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java @@ -16,8 +16,9 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.junit.Before; import org.junit.Test; @@ -29,11 +30,11 @@ public class DelegatingPeekingKeyValueIteratorTest { private final String name = "name"; - private InMemoryKeyValueStore store; + private KeyValueStore store; @Before public void setUp() { - store = new InMemoryKeyValueStore<>(name, Serdes.String(), Serdes.String()); + store = new GenericInMemoryKeyValueStore<>(name); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java index 4a0796d3ea4e6..bf54786fff89f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.junit.Before; import org.junit.Test; @@ -47,7 +49,7 @@ public Bytes cacheKey(final Bytes key) { }; @SuppressWarnings("unchecked") - private final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name", null, null); + private final KeyValueStore store = new GenericInMemoryKeyValueStore<>("my-store"); private final KeyValue firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()), new LRUCacheEntry("1".getBytes())); private final List> entries = asList( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java index d7f164cef5faf..4028b0cd901f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java @@ -39,7 +39,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest { @Before public void setUp() throws Exception { - store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray()); + store = new InMemoryKeyValueStore(namespace); cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new Metrics())); } @@ -146,7 +146,7 @@ public void shouldSkipAllDeletedFromCache() throws Exception { @Test public void shouldPeekNextKey() throws Exception { - final KeyValueStore kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray()); + final KeyValueStore kv = new InMemoryKeyValueStore("one"); final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics())); final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; for (int i = 0; i < bytes.length - 1; i += 2) { diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java new file mode 100644 index 0000000000000..e9d20f1a4fbd4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator; + +/** + * This class is a generic version of the in-memory key-value store that is useful for testing when you + * need a basic KeyValueStore for arbitrary types and don't have/want to write a serde + */ +public class GenericInMemoryKeyValueStore implements KeyValueStore { + + private final String name; + private final NavigableMap map; + private volatile boolean open = false; + + public GenericInMemoryKeyValueStore(final String name) { + this.name = name; + + this.map = new TreeMap<>(); + } + + @Override + public String name() { + return this.name; + } + + @Override + @SuppressWarnings("unchecked") + /* This is a "dummy" store used for testing and does not support restoring from changelog since we allow it to be serde-ignorant */ + public void init(final ProcessorContext context, final StateStore root) { + if (root != null) { + context.register(root, null); + } + + this.open = true; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return this.open; + } + + @Override + public synchronized V get(final K key) { + return this.map.get(key); + } + + @Override + public synchronized void put(final K key, + final V value) { + if (value == null) { + this.map.remove(key); + } else { + this.map.put(key, value); + } + } + + @Override + public synchronized V putIfAbsent(final K key, + final V value) { + final V originalValue = get(key); + if (originalValue == null) { + put(key, value); + } + return originalValue; + } + + @Override + public synchronized void putAll(final List> entries) { + for (final KeyValue entry : entries) { + put(entry.key, entry.value); + } + } + + @Override + public synchronized V delete(final K key) { + return this.map.remove(key); + } + + @Override + public synchronized KeyValueIterator range(final K from, + final K to) { + return new DelegatingPeekingKeyValueIterator<>( + name, + new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + } + + @Override + public synchronized KeyValueIterator all() { + final TreeMap copy = new TreeMap<>(this.map); + return new DelegatingPeekingKeyValueIterator<>(name, new GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator())); + } + + @Override + public long approximateNumEntries() { + return this.map.size(); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + this.map.clear(); + this.open = false; + } + + private static class GenericInMemoryKeyValueIterator implements KeyValueIterator { + private final Iterator> iter; + + private GenericInMemoryKeyValueIterator(final Iterator> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue next() { + final Map.Entry entry = iter.next(); + return new KeyValue<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + } +} \ No newline at end of file diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index 41b62f9333341..32c479caaa1a4 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -27,7 +27,9 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; + +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.junit.Test; import java.io.File; @@ -230,6 +232,7 @@ public void process(final String key, final Long value) { assertFalse(context.committed()); } + @SuppressWarnings("unchecked") @Test public void shouldStoreAndReturnStateStores() { final AbstractProcessor processor = new AbstractProcessor() { @@ -243,10 +246,16 @@ public void process(final String key, final Long value) { }; final MockProcessorContext context = new MockProcessorContext(); - final KeyValueStore store = new InMemoryKeyValueStore<>("my-state", Serdes.String(), Serdes.Long()); - context.register(store, null); + + final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("my-state"), + Serdes.String(), + Serdes.Long()).withLoggingDisabled(); + + final KeyValueStore store = (KeyValueStore) storeBuilder.build(); store.init(context, store); + processor.init(context); processor.process("foo", 5L);