Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public String name() {

@Override
public KeyValueStore<Bytes, byte[]> get() {
return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
return new InMemoryKeyValueStore(name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Optimized {@link KeyValueIterator} used when the same element could be peeked multiple times.
*/
class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
private final KeyValueIterator<K, V> underlying;
private final String storeName;
private KeyValue<K, V> next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,27 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serde;
import java.util.List;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;

public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final NavigableMap<K, V> map;
private final NavigableMap<Bytes, byte[]> map;
private volatile boolean open = false;

private StateSerdes<K, V> serdes;

public InMemoryKeyValueStore(final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
public InMemoryKeyValueStore(final String name) {
this.name = name;
this.keySerde = keySerde;
this.valueSerde = valueSerde;

// TODO: when we have serde associated with class types, we can
// improve this situation by passing the comparator here.
this.map = new TreeMap<>();
}

Expand All @@ -61,20 +49,15 @@ public String name() {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context,
final StateStore root) {
// construct the serde
this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

if (root != null) {
// register the store
context.register(root, (key, value) -> {
// this is a delete
if (value == null) {
delete(serdes.keyFrom(key));
delete(Bytes.wrap(key));
} else {
put(serdes.keyFrom(key), serdes.valueFrom(value));
put(Bytes.wrap(key), value);
}
});
}
Expand All @@ -93,13 +76,12 @@ public boolean isOpen() {
}

@Override
public synchronized V get(final K key) {
public synchronized byte[] get(final Bytes key) {
return this.map.get(key);
}

@Override
public synchronized void put(final K key,
final V value) {
public synchronized void put(final Bytes key, final byte[] value) {
if (value == null) {
this.map.remove(key);
} else {
Expand All @@ -108,39 +90,38 @@ public synchronized void put(final K key,
}

@Override
public synchronized V putIfAbsent(final K key,
final V value) {
final V originalValue = get(key);
public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
final byte[] originalValue = get(key);
if (originalValue == null) {
put(key, value);
}
return originalValue;
}

@Override
public synchronized void putAll(final List<KeyValue<K, V>> entries) {
for (final KeyValue<K, V> entry : entries) {
public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
for (final KeyValue<Bytes, byte[]> entry : entries) {
put(entry.key, entry.value);
}
}

@Override
public synchronized V delete(final K key) {
public synchronized byte[] delete(final Bytes key) {
return this.map.remove(key);
}

@Override
public synchronized KeyValueIterator<K, V> range(final K from,
final K to) {
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator<>(this.map.subMap(from, true, to, true).entrySet().iterator()));
new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator()));
}

@Override
public synchronized KeyValueIterator<K, V> all() {
final TreeMap<K, V> copy = new TreeMap<>(this.map);
return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator<>(copy.entrySet().iterator()));
public synchronized KeyValueIterator<Bytes, byte[]> all() {
final TreeMap<Bytes, byte[]> copy = new TreeMap<>(this.map);
return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator(copy.entrySet().iterator()));
}

@Override
Expand All @@ -159,10 +140,10 @@ public void close() {
this.open = false;
}

private static class InMemoryKeyValueIterator<K, V> implements KeyValueIterator<K, V> {
private final Iterator<Map.Entry<K, V>> iter;
private static class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
private final Iterator<Map.Entry<Bytes, byte[]>> iter;

private InMemoryKeyValueIterator(final Iterator<Map.Entry<K, V>> iter) {
private InMemoryKeyValueIterator(final Iterator<Map.Entry<Bytes, byte[]>> iter) {
this.iter = iter;
}

Expand All @@ -172,8 +153,8 @@ public boolean hasNext() {
}

@Override
public KeyValue<K, V> next() {
final Map.Entry<K, V> entry = iter.next();
public KeyValue<Bytes, byte[]> next() {
final Map.Entry<Bytes, byte[]> entry = iter.next();
return new KeyValue<>(entry.getKey(), entry.getValue());
}

Expand All @@ -188,7 +169,7 @@ public void close() {
}

@Override
public K peekNextKey() {
public Bytes peekNextKey() {
throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.Test;

Expand All @@ -44,9 +45,7 @@ public void shouldAddAndSubtract() {
this::differenceNotNullArgs
).get();


final InMemoryKeyValueStore<String, Set<String>> myStore =
new InMemoryKeyValueStore<>("myStore", null, null);
final KeyValueStore<String, Set<String>> myStore = new GenericInMemoryKeyValueStore<>("myStore");

context.register(myStore, null);
reduceProcessor.init(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
Expand Down Expand Up @@ -107,7 +106,7 @@ public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
@Test
public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class);
final InMemoryKeyValueStore<Bytes, byte[]> store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray());
final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name");
EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
EasyMock.expect(supplier.get()).andReturn(store);
EasyMock.replay(supplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
private final int maxCacheSizeBytes = 150;
private InternalMockProcessorContext context;
private CachingKeyValueStore<String, String> store;
private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
private InMemoryKeyValueStore underlyingStore;
private ThreadCache cache;
private CacheFlushListenerStub<String, String> cacheFlushListener;
private String topic;

@Before
public void setUp() {
final String storeName = "store";
underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray());
underlyingStore = new InMemoryKeyValueStore(storeName);
cacheFlushListener = new CacheFlushListenerStub<>();
store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
store.setFlushListener(cacheFlushListener, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public class ChangeLoggingKeyValueBytesStoreTest {

private InternalMockProcessorContext context;
private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv");
private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
private final Map<Object, Object> sent = new HashMap<>();
private final Bytes hi = Bytes.wrap("hi".getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -29,11 +30,11 @@
public class DelegatingPeekingKeyValueIteratorTest {

private final String name = "name";
private InMemoryKeyValueStore<String, String> store;
private KeyValueStore<String, String> store;

@Before
public void setUp() {
store = new InMemoryKeyValueStore<>(name, Serdes.String(), Serdes.String());
store = new GenericInMemoryKeyValueStore<>(name);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -47,7 +49,7 @@ public Bytes cacheKey(final Bytes key) {
};

@SuppressWarnings("unchecked")
private final InMemoryKeyValueStore<Bytes, LRUCacheEntry> store = new InMemoryKeyValueStore("name", null, null);
private final KeyValueStore<Bytes, LRUCacheEntry> store = new GenericInMemoryKeyValueStore<>("my-store");
private final KeyValue<Bytes, LRUCacheEntry> firstEntry = KeyValue.pair(Bytes.wrap("a".getBytes()),
new LRUCacheEntry("1".getBytes()));
private final List<KeyValue<Bytes, LRUCacheEntry>> entries = asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest {

@Before
public void setUp() throws Exception {
store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray());
store = new InMemoryKeyValueStore(namespace);
cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new Metrics()));
}

Expand Down Expand Up @@ -146,7 +146,7 @@ public void shouldSkipAllDeletedFromCache() throws Exception {

@Test
public void shouldPeekNextKey() throws Exception {
final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray());
final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore("one");
final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics()));
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
for (int i = 0; i < bytes.length - 1; i += 2) {
Expand Down
Loading