From c39ae20efb4217552a23ade44b04bd0355077253 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 19 Feb 2019 16:26:14 -0800 Subject: [PATCH 1/4] Inlined generic parameters for in-memory key-value byte store and fixed any tests relying on using generic parameters --- .../apache/kafka/streams/state/Stores.java | 2 +- .../internals/InMemoryKeyValueStore.java | 71 ++++------ .../kstream/internals/KTableReduceTest.java | 134 +++++++++++++++++- .../KeyValueStoreMaterializerTest.java | 3 +- .../internals/CachingKeyValueStoreTest.java | 4 +- .../ChangeLoggingKeyValueBytesStoreTest.java | 2 +- ...DelegatingPeekingKeyValueIteratorTest.java | 7 +- .../internals/FilteredCacheIteratorTest.java | 59 +++++++- .../internals/KeyValueBytesStoreWrapper.java | 122 ++++++++++++++++ ...edCacheKeyValueBytesStoreIteratorTest.java | 4 +- .../streams/MockProcessorContextTest.java | 12 +- 11 files changed, 358 insertions(+), 62 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueBytesStoreWrapper.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index d8b19fd20b373..46a9d45a0fc26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -104,7 +104,7 @@ public String name() { @Override public KeyValueStore get() { - return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray()); + return new InMemoryKeyValueStore(name); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index d6dd42ac264ff..1b5322980dcc5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -16,39 +16,27 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; +import java.util.List; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; -public class InMemoryKeyValueStore implements KeyValueStore { +public class InMemoryKeyValueStore implements KeyValueStore { private final String name; - private final Serde keySerde; - private final Serde valueSerde; - private final NavigableMap map; + private final NavigableMap map; private volatile boolean open = false; - private StateSerdes serdes; - - public InMemoryKeyValueStore(final String name, - final Serde keySerde, - final Serde valueSerde) { + public InMemoryKeyValueStore(final String name) { this.name = name; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - // TODO: when we have serde associated with class types, we can - // improve this situation by passing the comparator here. this.map = new TreeMap<>(); } @@ -61,20 +49,15 @@ public String name() { @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { - // construct the serde - this.serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); if (root != null) { // register the store context.register(root, (key, value) -> { // this is a delete if (value == null) { - delete(serdes.keyFrom(key)); + delete(Bytes.wrap(key)); } else { - put(serdes.keyFrom(key), serdes.valueFrom(value)); + put(Bytes.wrap(key), value); } }); } @@ -93,13 +76,12 @@ public boolean isOpen() { } @Override - public synchronized V get(final K key) { + public synchronized byte[] get(final Bytes key) { return this.map.get(key); } @Override - public synchronized void put(final K key, - final V value) { + public synchronized void put(final Bytes key, final byte[] value) { if (value == null) { this.map.remove(key); } else { @@ -108,9 +90,8 @@ public synchronized void put(final K key, } @Override - public synchronized V putIfAbsent(final K key, - final V value) { - final V originalValue = get(key); + public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { + final byte[] originalValue = get(key); if (originalValue == null) { put(key, value); } @@ -118,29 +99,29 @@ public synchronized V putIfAbsent(final K key, } @Override - public synchronized void putAll(final List> entries) { - for (final KeyValue entry : entries) { + public synchronized void putAll(final List> entries) { + for (final KeyValue entry : entries) { put(entry.key, entry.value); } } @Override - public synchronized V delete(final K key) { + public synchronized byte[] delete(final Bytes key) { return this.map.remove(key); } @Override - public synchronized KeyValueIterator range(final K from, - final K to) { + public synchronized KeyValueIterator range(final Bytes from, + final Bytes to) { return new DelegatingPeekingKeyValueIterator<>( name, - new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator())); } @Override - public synchronized KeyValueIterator all() { - final TreeMap copy = new TreeMap<>(this.map); - return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator())); + public synchronized KeyValueIterator all() { + final TreeMap copy = new TreeMap<>(this.map); + return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator(copy.entrySet().iterator())); } @Override @@ -159,10 +140,10 @@ public void close() { this.open = false; } - private static class InMemoryKeyValueIterator implements KeyValueIterator { - private final Iterator> iter; + private static class InMemoryKeyValueIterator implements KeyValueIterator { + private final Iterator> iter; - private InMemoryKeyValueIterator(final Iterator> iter) { + private InMemoryKeyValueIterator(final Iterator> iter) { this.iter = iter; } @@ -172,8 +153,8 @@ public boolean hasNext() { } @Override - public KeyValue next() { - final Map.Entry entry = iter.next(); + public KeyValue next() { + final Map.Entry entry = iter.next(); return new KeyValue<>(entry.getKey(), entry.getValue()); } @@ -188,7 +169,7 @@ public void close() { } @Override - public K peekNextKey() { + public Bytes peekNextKey() { throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java index 05b74dca9f45b..59d64c5eee923 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java @@ -16,10 +16,20 @@ */ package org.apache.kafka.streams.kstream.internals; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.WrapperSerde; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.internals.KeyValueBytesStoreWrapper; import org.apache.kafka.test.InternalMockProcessorContext; import org.junit.Test; @@ -30,9 +40,13 @@ import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class KTableReduceTest { + private final Serde stringSerde = Serdes.String(); + private final Serde> setSerde = new StringSetSerde(); + @Test public void shouldAddAndSubtract() { final AbstractProcessorContext context = new InternalMockProcessorContext(); @@ -44,9 +58,8 @@ public void shouldAddAndSubtract() { this::differenceNotNullArgs ).get(); - - final InMemoryKeyValueStore> myStore = - new InMemoryKeyValueStore<>("myStore", null, null); + final InMemoryKeyValueStore underlyingStore = new InMemoryKeyValueStore("myStore"); + final KeyValueStore> myStore = new KeyValueBytesStoreWrapper<>(underlyingStore, stringSerde, setSerde); context.register(myStore, null); reduceProcessor.init(context); @@ -58,6 +71,7 @@ public void shouldAddAndSubtract() { assertEquals(singleton("b"), myStore.get("A")); reduceProcessor.process("A", new Change<>(null, singleton("b"))); assertEquals(emptySet(), myStore.get("A")); + } private Set differenceNotNullArgs(final Set left, final Set right) { @@ -78,4 +92,118 @@ private Set unionNotNullArgs(final Set left, final Set r strings.addAll(right); return strings; } + + @Test + public void stringSetSerdeTest() { + final Set originalSet = new HashSet<>(); + final String topicName = "serdes"; + + final String string1 = "One"; + final String string2 = "Two"; + final String string3 = "Three"; + originalSet.add(string1); + originalSet.add(string2); + originalSet.add(string3); + + final byte[] bytes = setSerde.serializer().serialize(topicName, originalSet); + final Set newSet = setSerde.deserializer().deserialize(topicName, bytes); + + assertTrue(newSet.containsAll(originalSet)); + assertEquals(originalSet.size(), newSet.size()); + } + + private static final class StringSetSerde extends WrapperSerde> { + public StringSetSerde() { + super(new StringSetSerializer(), new StringSetDeserializer()); + } + + private static class StringSetSerializer implements Serializer> { + Serde stringSerde = Serdes.String(); + Serde intSerde = Serdes.Integer(); + + @Override + public void configure(final Map configs, final boolean isKey) { + } + + @Override + public byte[] serialize(final String topic, final Set data) { + if (data == null) + return null; + + final List bytesList = new LinkedList<>(); + int totalBytes = 0; + + // Data is encoded as 4 bytes containing N, the length in bytes of the next string, followed by the N bytes containing that string + for (final String string : data) { + final byte[] stringBytes = stringSerde.serializer().serialize(topic, string); + final int numStringBytes = stringBytes.length; + final byte[] lengthBytes = intSerde.serializer().serialize(topic, numStringBytes); + + totalBytes += numStringBytes + 4; + bytesList.add(lengthBytes); + bytesList.add(stringBytes); + } + + // now that we know the total number of bytes needed we can allocate an array of that size and copy the individual byte[] over + final byte[] serializedBytes = new byte[totalBytes]; + + int i = 0; + for (final byte[] bytes : bytesList) { + for (final byte b : bytes) { + serializedBytes[i++] = b; + } + } + return serializedBytes; + } + + @Override + public void close() { + // nothing to do + } + } + + private static class StringSetDeserializer implements Deserializer> { + final Serde stringSerde = Serdes.String(); + final Serde intSerde = Serdes.Integer(); + + @Override + public void configure(final Map configs, final boolean isKey) { + } + + @Override + public Set deserialize(final String topic, final byte[] data) { + if (data == null) + return null; + + final Set strings = new HashSet<>(); + + int i = 0; + while (i < data.length) { + final byte[] lengthBytes = getNBytes(data, i, 4); + final int stringLength = intSerde.deserializer().deserialize(topic, lengthBytes); + i += 4; + + final byte[] stringBytes = getNBytes(data, i, stringLength); + final String string = stringSerde.deserializer().deserialize(topic, stringBytes); + i += stringLength; + + strings.add(string); + } + return strings; + } + + @Override + public void close() { + // nothing to do + } + + private byte[] getNBytes(final byte[] data, int i, final int numBytes) { + final byte[] bytes = new byte[numBytes]; + for (int b = 0; b < numBytes; ++b, ++i) { + bytes[b] = data[i]; + } + return bytes; + } + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java index bb1dec2587112..30080c3f03cbd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.InternalNameProvider; @@ -107,7 +106,7 @@ public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() { @Test public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() { final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class); - final InMemoryKeyValueStore store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray()); + final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name"); EasyMock.expect(supplier.name()).andReturn("name").anyTimes(); EasyMock.expect(supplier.get()).andReturn(store); EasyMock.replay(supplier); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index b1a64014bbec7..6c2b7cf4f2b1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -58,7 +58,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { private final int maxCacheSizeBytes = 150; private InternalMockProcessorContext context; private CachingKeyValueStore store; - private InMemoryKeyValueStore underlyingStore; + private InMemoryKeyValueStore underlyingStore; private ThreadCache cache; private CacheFlushListenerStub cacheFlushListener; private String topic; @@ -66,7 +66,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @Before public void setUp() { final String storeName = "store"; - underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray()); + underlyingStore = new InMemoryKeyValueStore(storeName); cacheFlushListener = new CacheFlushListenerStub<>(); store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); store.setFlushListener(cacheFlushListener, false); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 5fdfd46ccd3f0..5645b8bc84a5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -43,7 +43,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { private InternalMockProcessorContext context; - private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray()); + private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv"); private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner); private final Map sent = new HashMap<>(); private final Bytes hi = Bytes.wrap("hi".getBytes()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java index 8b6fc9574cefe..fc85966704f30 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java @@ -17,7 +17,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueStore; import org.junit.Before; import org.junit.Test; @@ -29,11 +31,12 @@ public class DelegatingPeekingKeyValueIteratorTest { private final String name = "name"; - private InMemoryKeyValueStore store; + private KeyValueStore store; @Before public void setUp() { - store = new InMemoryKeyValueStore<>(name, Serdes.String(), Serdes.String()); + final KeyValueStore underlyingStore = new InMemoryKeyValueStore(name); + store = new KeyValueBytesStoreWrapper<>(underlyingStore, Serdes.String(), Serdes.String()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java index 4a0796d3ea4e6..f2110db2204ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java @@ -17,6 +17,11 @@ package org.apache.kafka.streams.state.internals; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; @@ -47,7 +52,7 @@ public Bytes cacheKey(final Bytes key) { }; @SuppressWarnings("unchecked") - private final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name", null, null); + private final KeyValueStoreWrapper store = new KeyValueStoreWrapper<>(); private final KeyValue firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()), new LRUCacheEntry("1".getBytes())); private final List> entries = asList( @@ -127,4 +132,56 @@ public void shouldThrowUnsupportedOperationExeceptionOnRemove() { allIterator.remove(); } + public static class KeyValueStoreWrapper, V> { + private final NavigableMap map; + + KeyValueStoreWrapper() { + map = new TreeMap<>(); + } + + private void putAll(final List> entries) { + for (final KeyValue entry : entries) { + map.put(entry.key, entry.value); + } + } + + private KeyValueIterator all() { + return new KeyValueIteratorWrapper<>(map.entrySet().iterator()); + } + + private static class KeyValueIteratorWrapper implements KeyValueIterator { + private final Iterator> iter; + + private KeyValueIteratorWrapper(final Iterator> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue next() { + final Map.Entry entry = iter.next(); + return new KeyValue<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueBytesStoreWrapper.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueBytesStoreWrapper.java new file mode 100644 index 0000000000000..433d01ea82b7c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueBytesStoreWrapper.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.List; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StateSerdes; + +/* A generic wrapper around the key-value bytes stores for use in simple tests where only a basic store is required */ +public class KeyValueBytesStoreWrapper implements KeyValueStore { + + private final KeyValueStore store; + private final StateSerdes serdes; + + public KeyValueBytesStoreWrapper(final KeyValueStore store, final Serde keySerde, final Serde valueSerde) { + this.store = store; + serdes = new StateSerdes<>("serdes-topic-name", keySerde, valueSerde); + } + + public void put(final K key, final V value) { + store.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value)); + } + + public V putIfAbsent(final K key, final V value) { + return serdes.valueFrom(store.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value))); + } + + public void putAll(final List> entries) { + for (final KeyValue entry : entries) { + store.put(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)); + } + } + + public V delete(final K key) { + return serdes.valueFrom(store.delete(Bytes.wrap(serdes.rawKey(key)))); + } + + public V get(final K key) { + return serdes.valueFrom(store.get(Bytes.wrap(serdes.rawKey(key)))); + } + + public KeyValueIterator range(final K from, final K to) { + return new BytesIteratorWrapper(store.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)))); + } + + public KeyValueIterator all() { + return new BytesIteratorWrapper(store.all()); + } + + public void init(final ProcessorContext context, final StateStore root) { + store.init(context, root); + } + + public String name() { + return store.name(); + } + + public void flush() { + store.flush(); + } + + public void close() { + store.close(); + } + + public boolean isOpen() { + return store.isOpen(); + } + + public boolean persistent() { + return store.persistent(); + } + + public long approximateNumEntries() { + return store.approximateNumEntries(); + } + + private class BytesIteratorWrapper implements KeyValueIterator { + private final KeyValueIterator underlying; + + BytesIteratorWrapper(final KeyValueIterator underlying) { + this.underlying = underlying; + } + + public boolean hasNext() { + return underlying.hasNext(); + } + + public KeyValue next() { + final KeyValue next = underlying.next(); + return new KeyValue<>(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value)); + } + + public void close() { + underlying.close(); + } + + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java index d7f164cef5faf..4028b0cd901f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java @@ -39,7 +39,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest { @Before public void setUp() throws Exception { - store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray()); + store = new InMemoryKeyValueStore(namespace); cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new Metrics())); } @@ -146,7 +146,7 @@ public void shouldSkipAllDeletedFromCache() throws Exception { @Test public void shouldPeekNextKey() throws Exception { - final KeyValueStore kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray()); + final KeyValueStore kv = new InMemoryKeyValueStore("one"); final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics())); final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; for (int i = 0; i < bytes.length - 1; i += 2) { diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index 41b62f9333341..00e8440a93f74 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -27,7 +27,8 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.junit.Test; import java.io.File; @@ -230,6 +231,7 @@ public void process(final String key, final Long value) { assertFalse(context.committed()); } + @SuppressWarnings("unchecked") @Test public void shouldStoreAndReturnStateStores() { final AbstractProcessor processor = new AbstractProcessor() { @@ -243,10 +245,14 @@ public void process(final String key, final Long value) { }; final MockProcessorContext context = new MockProcessorContext(); - final KeyValueStore store = new InMemoryKeyValueStore<>("my-state", Serdes.String(), Serdes.Long()); - context.register(store, null); + final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("my-state"), + Serdes.String(), + Serdes.Long()).withLoggingDisabled(); + final KeyValueStore store = (KeyValueStore) storeBuilder.build(); store.init(context, store); + processor.init(context); processor.process("foo", 5L); From 2efda8fa67de45c110bcf3ac0b8ddb7842a2c49e Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 20 Feb 2019 14:17:08 -0800 Subject: [PATCH 2/4] Minor fixes and cosmetic changes from PR review --- .../internals/InMemoryKeyValueStore.java | 2 +- .../kstream/internals/KTableReduceTest.java | 16 +++++--- ...DelegatingPeekingKeyValueIteratorTest.java | 1 + .../internals/FilteredCacheIteratorTest.java | 12 +++--- .../KeyValueBytesStoreWrapper.java | 40 ++++++++++--------- .../streams/MockProcessorContextTest.java | 13 +++--- 6 files changed, 46 insertions(+), 38 deletions(-) rename streams/src/test/java/org/apache/kafka/{streams/state/internals => test}/KeyValueBytesStoreWrapper.java (73%) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 1b5322980dcc5..cc28d6487bb39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -112,7 +112,7 @@ public synchronized byte[] delete(final Bytes key) { @Override public synchronized KeyValueIterator range(final Bytes from, - final Bytes to) { + final Bytes to) { return new DelegatingPeekingKeyValueIterator<>( name, new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator())); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java index 59d64c5eee923..9187e29e56f7f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java @@ -29,8 +29,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; -import org.apache.kafka.streams.state.internals.KeyValueBytesStoreWrapper; import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.KeyValueBytesStoreWrapper; import org.junit.Test; import java.util.HashSet; @@ -71,7 +71,6 @@ public void shouldAddAndSubtract() { assertEquals(singleton("b"), myStore.get("A")); reduceProcessor.process("A", new Change<>(null, singleton("b"))); assertEquals(emptySet(), myStore.get("A")); - } private Set differenceNotNullArgs(final Set left, final Set right) { @@ -123,12 +122,15 @@ private static class StringSetSerializer implements Serializer> { @Override public void configure(final Map configs, final boolean isKey) { + stringSerde.configure(configs, isKey); + intSerde.configure(configs, isKey); } @Override public byte[] serialize(final String topic, final Set data) { - if (data == null) + if (data == null) { return null; + } final List bytesList = new LinkedList<>(); int totalBytes = 0; @@ -158,7 +160,8 @@ public byte[] serialize(final String topic, final Set data) { @Override public void close() { - // nothing to do + stringSerde.close(); + intSerde.close(); } } @@ -168,6 +171,8 @@ private static class StringSetDeserializer implements Deserializer> @Override public void configure(final Map configs, final boolean isKey) { + stringSerde.configure(configs, isKey); + intSerde.configure(configs, isKey); } @Override @@ -194,7 +199,8 @@ public Set deserialize(final String topic, final byte[] data) { @Override public void close() { - // nothing to do + stringSerde.close(); + intSerde.close(); } private byte[] getNBytes(final byte[] data, int i, final int numBytes) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java index fc85966704f30..84f01ed1c29ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.KeyValueBytesStoreWrapper; import org.junit.Before; import org.junit.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java index f2110db2204ff..636a6187d522c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java @@ -52,7 +52,7 @@ public Bytes cacheKey(final Bytes key) { }; @SuppressWarnings("unchecked") - private final KeyValueStoreWrapper store = new KeyValueStoreWrapper<>(); + private final GenericKeyValueStore store = new GenericKeyValueStore<>(); private final KeyValue firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()), new LRUCacheEntry("1".getBytes())); private final List> entries = asList( @@ -132,10 +132,10 @@ public void shouldThrowUnsupportedOperationExeceptionOnRemove() { allIterator.remove(); } - public static class KeyValueStoreWrapper, V> { + private static class GenericKeyValueStore, V> { private final NavigableMap map; - KeyValueStoreWrapper() { + GenericKeyValueStore() { map = new TreeMap<>(); } @@ -146,13 +146,13 @@ private void putAll(final List> entries) { } private KeyValueIterator all() { - return new KeyValueIteratorWrapper<>(map.entrySet().iterator()); + return new GenericKeyValueIterator<>(map.entrySet().iterator()); } - private static class KeyValueIteratorWrapper implements KeyValueIterator { + private static class GenericKeyValueIterator implements KeyValueIterator { private final Iterator> iter; - private KeyValueIteratorWrapper(final Iterator> iter) { + private GenericKeyValueIterator(final Iterator> iter) { this.iter = iter; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueBytesStoreWrapper.java b/streams/src/test/java/org/apache/kafka/test/KeyValueBytesStoreWrapper.java similarity index 73% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueBytesStoreWrapper.java rename to streams/src/test/java/org/apache/kafka/test/KeyValueBytesStoreWrapper.java index 433d01ea82b7c..8e05f714f7065 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueBytesStoreWrapper.java +++ b/streams/src/test/java/org/apache/kafka/test/KeyValueBytesStoreWrapper.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state.internals; +package org.apache.kafka.test; import java.util.List; import org.apache.kafka.common.serialization.Serde; @@ -29,70 +29,72 @@ /* A generic wrapper around the key-value bytes stores for use in simple tests where only a basic store is required */ public class KeyValueBytesStoreWrapper implements KeyValueStore { - private final KeyValueStore store; + private final KeyValueStore wrapped; private final StateSerdes serdes; - public KeyValueBytesStoreWrapper(final KeyValueStore store, final Serde keySerde, final Serde valueSerde) { - this.store = store; + public KeyValueBytesStoreWrapper(final KeyValueStore wrapped, + final Serde keySerde, + final Serde valueSerde) { + this.wrapped = wrapped; serdes = new StateSerdes<>("serdes-topic-name", keySerde, valueSerde); } public void put(final K key, final V value) { - store.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value)); + wrapped.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value)); } public V putIfAbsent(final K key, final V value) { - return serdes.valueFrom(store.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value))); + return serdes.valueFrom(wrapped.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value))); } public void putAll(final List> entries) { for (final KeyValue entry : entries) { - store.put(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)); + wrapped.put(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)); } } public V delete(final K key) { - return serdes.valueFrom(store.delete(Bytes.wrap(serdes.rawKey(key)))); + return serdes.valueFrom(wrapped.delete(Bytes.wrap(serdes.rawKey(key)))); } public V get(final K key) { - return serdes.valueFrom(store.get(Bytes.wrap(serdes.rawKey(key)))); + return serdes.valueFrom(wrapped.get(Bytes.wrap(serdes.rawKey(key)))); } public KeyValueIterator range(final K from, final K to) { - return new BytesIteratorWrapper(store.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)))); + return new BytesIteratorWrapper(wrapped.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)))); } public KeyValueIterator all() { - return new BytesIteratorWrapper(store.all()); + return new BytesIteratorWrapper(wrapped.all()); } public void init(final ProcessorContext context, final StateStore root) { - store.init(context, root); + wrapped.init(context, root); } public String name() { - return store.name(); + return wrapped.name(); } public void flush() { - store.flush(); + wrapped.flush(); } public void close() { - store.close(); + wrapped.close(); } public boolean isOpen() { - return store.isOpen(); + return wrapped.isOpen(); } public boolean persistent() { - return store.persistent(); + return wrapped.persistent(); } public long approximateNumEntries() { - return store.approximateNumEntries(); + return wrapped.approximateNumEntries(); } private class BytesIteratorWrapper implements KeyValueIterator { @@ -116,7 +118,7 @@ public void close() { } public K peekNextKey() { - throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + return serdes.keyFrom(underlying.peekNextKey().get()); } } } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index 00e8440a93f74..b5aab52d7fc89 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -27,8 +27,9 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; + +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.test.KeyValueBytesStoreWrapper; import org.junit.Test; import java.io.File; @@ -245,12 +246,10 @@ public void process(final String key, final Long value) { }; final MockProcessorContext context = new MockProcessorContext(); - final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("my-state"), - Serdes.String(), - Serdes.Long()).withLoggingDisabled(); - final KeyValueStore store = (KeyValueStore) storeBuilder.build(); + final InMemoryKeyValueStore underlyingStore = new InMemoryKeyValueStore("my-state"); + final KeyValueStore store = new KeyValueBytesStoreWrapper<>(underlyingStore, Serdes.String(), Serdes.Long()); + store.init(context, store); processor.init(context); From ea571b9337ec1c5ada94d61cad2a0f31c4b70137 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Fri, 22 Feb 2019 16:07:55 -0800 Subject: [PATCH 3/4] Added GenericInMemoryKeyValueStore to clean up testing --- .../DelegatingPeekingKeyValueIterator.java | 2 +- .../kstream/internals/KTableReduceTest.java | 139 +------------- ...DelegatingPeekingKeyValueIteratorTest.java | 7 +- .../internals/FilteredCacheIteratorTest.java | 61 +----- .../test/GenericInMemoryKeyValueStore.java | 174 ++++++++++++++++++ .../kafka/test/KeyValueBytesStoreWrapper.java | 124 ------------- .../streams/MockProcessorContextTest.java | 9 +- 7 files changed, 185 insertions(+), 331 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java delete mode 100644 streams/src/test/java/org/apache/kafka/test/KeyValueBytesStoreWrapper.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java index 673a7c9ba2780..20a434a9a2c4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIterator.java @@ -25,7 +25,7 @@ /** * Optimized {@link KeyValueIterator} used when the same element could be peeked multiple times. */ -class DelegatingPeekingKeyValueIterator implements KeyValueIterator, PeekingKeyValueIterator { +public class DelegatingPeekingKeyValueIterator implements KeyValueIterator, PeekingKeyValueIterator { private final KeyValueIterator underlying; private final String storeName; private KeyValue next; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java index 9187e29e56f7f..afb2cc12d94f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java @@ -16,21 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serdes.WrapperSerde; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.KeyValueBytesStoreWrapper; import org.junit.Test; import java.util.HashSet; @@ -40,13 +31,9 @@ import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; public class KTableReduceTest { - private final Serde stringSerde = Serdes.String(); - private final Serde> setSerde = new StringSetSerde(); - @Test public void shouldAddAndSubtract() { final AbstractProcessorContext context = new InternalMockProcessorContext(); @@ -58,8 +45,7 @@ public void shouldAddAndSubtract() { this::differenceNotNullArgs ).get(); - final InMemoryKeyValueStore underlyingStore = new InMemoryKeyValueStore("myStore"); - final KeyValueStore> myStore = new KeyValueBytesStoreWrapper<>(underlyingStore, stringSerde, setSerde); + final KeyValueStore> myStore = new GenericInMemoryKeyValueStore<>("myStore"); context.register(myStore, null); reduceProcessor.init(context); @@ -91,125 +77,4 @@ private Set unionNotNullArgs(final Set left, final Set r strings.addAll(right); return strings; } - - @Test - public void stringSetSerdeTest() { - final Set originalSet = new HashSet<>(); - final String topicName = "serdes"; - - final String string1 = "One"; - final String string2 = "Two"; - final String string3 = "Three"; - originalSet.add(string1); - originalSet.add(string2); - originalSet.add(string3); - - final byte[] bytes = setSerde.serializer().serialize(topicName, originalSet); - final Set newSet = setSerde.deserializer().deserialize(topicName, bytes); - - assertTrue(newSet.containsAll(originalSet)); - assertEquals(originalSet.size(), newSet.size()); - } - - private static final class StringSetSerde extends WrapperSerde> { - public StringSetSerde() { - super(new StringSetSerializer(), new StringSetDeserializer()); - } - - private static class StringSetSerializer implements Serializer> { - Serde stringSerde = Serdes.String(); - Serde intSerde = Serdes.Integer(); - - @Override - public void configure(final Map configs, final boolean isKey) { - stringSerde.configure(configs, isKey); - intSerde.configure(configs, isKey); - } - - @Override - public byte[] serialize(final String topic, final Set data) { - if (data == null) { - return null; - } - - final List bytesList = new LinkedList<>(); - int totalBytes = 0; - - // Data is encoded as 4 bytes containing N, the length in bytes of the next string, followed by the N bytes containing that string - for (final String string : data) { - final byte[] stringBytes = stringSerde.serializer().serialize(topic, string); - final int numStringBytes = stringBytes.length; - final byte[] lengthBytes = intSerde.serializer().serialize(topic, numStringBytes); - - totalBytes += numStringBytes + 4; - bytesList.add(lengthBytes); - bytesList.add(stringBytes); - } - - // now that we know the total number of bytes needed we can allocate an array of that size and copy the individual byte[] over - final byte[] serializedBytes = new byte[totalBytes]; - - int i = 0; - for (final byte[] bytes : bytesList) { - for (final byte b : bytes) { - serializedBytes[i++] = b; - } - } - return serializedBytes; - } - - @Override - public void close() { - stringSerde.close(); - intSerde.close(); - } - } - - private static class StringSetDeserializer implements Deserializer> { - final Serde stringSerde = Serdes.String(); - final Serde intSerde = Serdes.Integer(); - - @Override - public void configure(final Map configs, final boolean isKey) { - stringSerde.configure(configs, isKey); - intSerde.configure(configs, isKey); - } - - @Override - public Set deserialize(final String topic, final byte[] data) { - if (data == null) - return null; - - final Set strings = new HashSet<>(); - - int i = 0; - while (i < data.length) { - final byte[] lengthBytes = getNBytes(data, i, 4); - final int stringLength = intSerde.deserializer().deserialize(topic, lengthBytes); - i += 4; - - final byte[] stringBytes = getNBytes(data, i, stringLength); - final String string = stringSerde.deserializer().deserialize(topic, stringBytes); - i += stringLength; - - strings.add(string); - } - return strings; - } - - @Override - public void close() { - stringSerde.close(); - intSerde.close(); - } - - private byte[] getNBytes(final byte[] data, int i, final int numBytes) { - final byte[] bytes = new byte[numBytes]; - for (int b = 0; b < numBytes; ++b, ++i) { - bytes[b] = data[i]; - } - return bytes; - } - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java index 84f01ed1c29ee..593b265aa02c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java @@ -16,11 +16,9 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.KeyValueBytesStoreWrapper; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.junit.Before; import org.junit.Test; @@ -36,8 +34,7 @@ public class DelegatingPeekingKeyValueIteratorTest { @Before public void setUp() { - final KeyValueStore underlyingStore = new InMemoryKeyValueStore(name); - store = new KeyValueBytesStoreWrapper<>(underlyingStore, Serdes.String(), Serdes.String()); + store = new GenericInMemoryKeyValueStore<>(name); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java index 636a6187d522c..bf54786fff89f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/FilteredCacheIteratorTest.java @@ -17,14 +17,11 @@ package org.apache.kafka.streams.state.internals; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.junit.Before; import org.junit.Test; @@ -52,7 +49,7 @@ public Bytes cacheKey(final Bytes key) { }; @SuppressWarnings("unchecked") - private final GenericKeyValueStore store = new GenericKeyValueStore<>(); + private final KeyValueStore store = new GenericInMemoryKeyValueStore<>("my-store"); private final KeyValue firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()), new LRUCacheEntry("1".getBytes())); private final List> entries = asList( @@ -132,56 +129,4 @@ public void shouldThrowUnsupportedOperationExeceptionOnRemove() { allIterator.remove(); } - private static class GenericKeyValueStore, V> { - private final NavigableMap map; - - GenericKeyValueStore() { - map = new TreeMap<>(); - } - - private void putAll(final List> entries) { - for (final KeyValue entry : entries) { - map.put(entry.key, entry.value); - } - } - - private KeyValueIterator all() { - return new GenericKeyValueIterator<>(map.entrySet().iterator()); - } - - private static class GenericKeyValueIterator implements KeyValueIterator { - private final Iterator> iter; - - private GenericKeyValueIterator(final Iterator> iter) { - this.iter = iter; - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public KeyValue next() { - final Map.Entry entry = iter.next(); - return new KeyValue<>(entry.getKey(), entry.getValue()); - } - - @Override - public void remove() { - iter.remove(); - } - - @Override - public void close() { - // do nothing - } - - @Override - public K peekNextKey() { - throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); - } - } - } - } diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java new file mode 100644 index 0000000000000..e9d20f1a4fbd4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator; + +/** + * This class is a generic version of the in-memory key-value store that is useful for testing when you + * need a basic KeyValueStore for arbitrary types and don't have/want to write a serde + */ +public class GenericInMemoryKeyValueStore implements KeyValueStore { + + private final String name; + private final NavigableMap map; + private volatile boolean open = false; + + public GenericInMemoryKeyValueStore(final String name) { + this.name = name; + + this.map = new TreeMap<>(); + } + + @Override + public String name() { + return this.name; + } + + @Override + @SuppressWarnings("unchecked") + /* This is a "dummy" store used for testing and does not support restoring from changelog since we allow it to be serde-ignorant */ + public void init(final ProcessorContext context, final StateStore root) { + if (root != null) { + context.register(root, null); + } + + this.open = true; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public boolean isOpen() { + return this.open; + } + + @Override + public synchronized V get(final K key) { + return this.map.get(key); + } + + @Override + public synchronized void put(final K key, + final V value) { + if (value == null) { + this.map.remove(key); + } else { + this.map.put(key, value); + } + } + + @Override + public synchronized V putIfAbsent(final K key, + final V value) { + final V originalValue = get(key); + if (originalValue == null) { + put(key, value); + } + return originalValue; + } + + @Override + public synchronized void putAll(final List> entries) { + for (final KeyValue entry : entries) { + put(entry.key, entry.value); + } + } + + @Override + public synchronized V delete(final K key) { + return this.map.remove(key); + } + + @Override + public synchronized KeyValueIterator range(final K from, + final K to) { + return new DelegatingPeekingKeyValueIterator<>( + name, + new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator())); + } + + @Override + public synchronized KeyValueIterator all() { + final TreeMap copy = new TreeMap<>(this.map); + return new DelegatingPeekingKeyValueIterator<>(name, new GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator())); + } + + @Override + public long approximateNumEntries() { + return this.map.size(); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + @Override + public void close() { + this.map.clear(); + this.open = false; + } + + private static class GenericInMemoryKeyValueIterator implements KeyValueIterator { + private final Iterator> iter; + + private GenericInMemoryKeyValueIterator(final Iterator> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue next() { + final Map.Entry entry = iter.next(); + return new KeyValue<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public K peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/KeyValueBytesStoreWrapper.java b/streams/src/test/java/org/apache/kafka/test/KeyValueBytesStoreWrapper.java deleted file mode 100644 index 8e05f714f7065..0000000000000 --- a/streams/src/test/java/org/apache/kafka/test/KeyValueBytesStoreWrapper.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.test; - -import java.util.List; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; - -/* A generic wrapper around the key-value bytes stores for use in simple tests where only a basic store is required */ -public class KeyValueBytesStoreWrapper implements KeyValueStore { - - private final KeyValueStore wrapped; - private final StateSerdes serdes; - - public KeyValueBytesStoreWrapper(final KeyValueStore wrapped, - final Serde keySerde, - final Serde valueSerde) { - this.wrapped = wrapped; - serdes = new StateSerdes<>("serdes-topic-name", keySerde, valueSerde); - } - - public void put(final K key, final V value) { - wrapped.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value)); - } - - public V putIfAbsent(final K key, final V value) { - return serdes.valueFrom(wrapped.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value))); - } - - public void putAll(final List> entries) { - for (final KeyValue entry : entries) { - wrapped.put(Bytes.wrap(serdes.rawKey(entry.key)), serdes.rawValue(entry.value)); - } - } - - public V delete(final K key) { - return serdes.valueFrom(wrapped.delete(Bytes.wrap(serdes.rawKey(key)))); - } - - public V get(final K key) { - return serdes.valueFrom(wrapped.get(Bytes.wrap(serdes.rawKey(key)))); - } - - public KeyValueIterator range(final K from, final K to) { - return new BytesIteratorWrapper(wrapped.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)))); - } - - public KeyValueIterator all() { - return new BytesIteratorWrapper(wrapped.all()); - } - - public void init(final ProcessorContext context, final StateStore root) { - wrapped.init(context, root); - } - - public String name() { - return wrapped.name(); - } - - public void flush() { - wrapped.flush(); - } - - public void close() { - wrapped.close(); - } - - public boolean isOpen() { - return wrapped.isOpen(); - } - - public boolean persistent() { - return wrapped.persistent(); - } - - public long approximateNumEntries() { - return wrapped.approximateNumEntries(); - } - - private class BytesIteratorWrapper implements KeyValueIterator { - private final KeyValueIterator underlying; - - BytesIteratorWrapper(final KeyValueIterator underlying) { - this.underlying = underlying; - } - - public boolean hasNext() { - return underlying.hasNext(); - } - - public KeyValue next() { - final KeyValue next = underlying.next(); - return new KeyValue<>(serdes.keyFrom(next.key.get()), serdes.valueFrom(next.value)); - } - - public void close() { - underlying.close(); - } - - public K peekNextKey() { - return serdes.keyFrom(underlying.peekNextKey().get()); - } - } -} diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index b5aab52d7fc89..3474919cf8719 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -28,8 +28,7 @@ import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; -import org.apache.kafka.test.KeyValueBytesStoreWrapper; +import org.apache.kafka.test.GenericInMemoryKeyValueStore; import org.junit.Test; import java.io.File; @@ -231,8 +230,7 @@ public void process(final String key, final Long value) { assertFalse(context.committed()); } - - @SuppressWarnings("unchecked") + @Test public void shouldStoreAndReturnStateStores() { final AbstractProcessor processor = new AbstractProcessor() { @@ -247,8 +245,7 @@ public void process(final String key, final Long value) { final MockProcessorContext context = new MockProcessorContext(); - final InMemoryKeyValueStore underlyingStore = new InMemoryKeyValueStore("my-state"); - final KeyValueStore store = new KeyValueBytesStoreWrapper<>(underlyingStore, Serdes.String(), Serdes.Long()); + final KeyValueStore store = new GenericInMemoryKeyValueStore<>("my-state"); store.init(context, store); From fc285251b32ef49c43e2bba3101f1504791815b7 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 25 Feb 2019 14:20:01 -0800 Subject: [PATCH 4/4] MockProcessorContextTest: replaced GenericInMemoryKeyValueStore with InMemoryKeyValueStore from a storeBuilder --- .../kafka/streams/MockProcessorContextTest.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index 3474919cf8719..32c479caaa1a4 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -28,7 +28,8 @@ import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.test.GenericInMemoryKeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.junit.Test; import java.io.File; @@ -230,7 +231,8 @@ public void process(final String key, final Long value) { assertFalse(context.committed()); } - + + @SuppressWarnings("unchecked") @Test public void shouldStoreAndReturnStateStores() { final AbstractProcessor processor = new AbstractProcessor() { @@ -245,7 +247,12 @@ public void process(final String key, final Long value) { final MockProcessorContext context = new MockProcessorContext(); - final KeyValueStore store = new GenericInMemoryKeyValueStore<>("my-state"); + final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("my-state"), + Serdes.String(), + Serdes.Long()).withLoggingDisabled(); + + final KeyValueStore store = (KeyValueStore) storeBuilder.build(); store.init(context, store);