diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 648c50b7e50cf..4ead76bca845c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -63,7 +63,7 @@ public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); store = (KeyValueStore) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener(context), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 60f1b6ae2ae49..bcadf840bde2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -93,7 +93,7 @@ public void init(final ProcessorContext context) { lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); store = (SessionStore) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener(context), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index e5f290e0eca22..2facc6d24660f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -89,7 +89,7 @@ public void init(final ProcessorContext context) { lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); windowStore = (WindowStore) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener, V>(context), sendOldValues); + tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<>(context), sendOldValues); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index b04a729a44702..c53224e845637 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -62,7 +62,7 @@ private class KTableAggregateProcessor extends AbstractProcessor> { public void init(final ProcessorContext context) { super.init(context); store = (KeyValueStore) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener(context), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index 0862e47ef6daf..323e198aaecbb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -18,7 +18,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.state.internals.CachedStateStore; import org.apache.kafka.streams.state.internals.WrappedStateStore; /** @@ -30,45 +29,24 @@ * @param */ class TupleForwarder { - private final CachedStateStore cachedStateStore; + private final boolean cachingEnabled; private final ProcessorContext context; @SuppressWarnings("unchecked") TupleForwarder(final StateStore store, final ProcessorContext context, - final ForwardingCacheFlushListener flushListener, + final ForwardingCacheFlushListener flushListener, final boolean sendOldValues) { - this.cachedStateStore = cachedStateStore(store); + cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); this.context = context; - if (this.cachedStateStore != null) { - cachedStateStore.setFlushListener(flushListener, sendOldValues); - } - } - - private CachedStateStore cachedStateStore(final StateStore store) { - if (store instanceof CachedStateStore) { - return (CachedStateStore) store; - } else if (store instanceof WrappedStateStore) { - StateStore wrapped = ((WrappedStateStore) store).wrapped(); - - while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) { - wrapped = ((WrappedStateStore) wrapped).wrapped(); - } - - if (!(wrapped instanceof CachedStateStore)) { - return null; - } - - return (CachedStateStore) wrapped; - } - return null; } public void maybeForward(final K key, final V newValue, final V oldValue) { - if (cachedStateStore == null) { - context.forward(key, new Change<>(newValue, oldValue)); + if (cachingEnabled) { + return; } + context.forward(key, new Change<>(newValue, oldValue)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index af3e7d1c120d3..c1c3a6064794e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -204,7 +204,9 @@ public Cancellable schedule(final Duration interval, return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback); } - private abstract static class StateStoreReadOnlyDecorator extends WrappedStateStore { + private abstract static class StateStoreReadOnlyDecorator + extends WrappedStateStore { + static final String ERROR_MESSAGE = "Global store is read only"; private StateStoreReadOnlyDecorator(final T inner) { @@ -229,7 +231,7 @@ public void close() { } private static class KeyValueStoreReadOnlyDecorator - extends StateStoreReadOnlyDecorator> + extends StateStoreReadOnlyDecorator, K, V> implements KeyValueStore { private KeyValueStoreReadOnlyDecorator(final KeyValueStore inner) { @@ -281,7 +283,7 @@ public V delete(final K key) { } private static class WindowStoreReadOnlyDecorator - extends StateStoreReadOnlyDecorator> + extends StateStoreReadOnlyDecorator, K, V> implements WindowStore { private WindowStoreReadOnlyDecorator(final WindowStore inner) { @@ -338,7 +340,7 @@ public KeyValueIterator, V> fetchAll(final long timeFrom, } private static class SessionStoreReadOnlyDecorator - extends StateStoreReadOnlyDecorator> + extends StateStoreReadOnlyDecorator, K, AGG> implements SessionStore { private SessionStoreReadOnlyDecorator(final SessionStore inner) { @@ -388,7 +390,9 @@ public KeyValueIterator, AGG> fetch(final K from, } } - private abstract static class StateStoreReadWriteDecorator extends WrappedStateStore { + private abstract static class StateStoreReadWriteDecorator + extends WrappedStateStore { + static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; private StateStoreReadWriteDecorator(final T inner) { @@ -408,7 +412,7 @@ public void close() { } private static class KeyValueStoreReadWriteDecorator - extends StateStoreReadWriteDecorator> + extends StateStoreReadWriteDecorator, K, V> implements KeyValueStore { private KeyValueStoreReadWriteDecorator(final KeyValueStore inner) { @@ -460,7 +464,7 @@ public V delete(final K key) { } private static class WindowStoreReadWriteDecorator - extends StateStoreReadWriteDecorator> + extends StateStoreReadWriteDecorator, K, V> implements WindowStore { private WindowStoreReadWriteDecorator(final WindowStore inner) { @@ -517,7 +521,7 @@ public KeyValueIterator, V> fetchAll(final long timeFrom, } private static class SessionStoreReadWriteDecorator - extends StateStoreReadWriteDecorator> + extends StateStoreReadWriteDecorator, K, AGG> implements SessionStore { private SessionStoreReadWriteDecorator(final SessionStore inner) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java index 2f9b0d1a2d2dd..aec1d5db27f77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java @@ -23,6 +23,6 @@ public interface CachedStateStore { * @param listener * @param sendOldValues */ - void setFlushListener(final CacheFlushListener listener, - final boolean sendOldValues); + boolean setFlushListener(final CacheFlushListener listener, + final boolean sendOldValues); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 4ccb2a0273b9f..9632385258b56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -16,17 +16,14 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.List; import java.util.Objects; @@ -34,25 +31,20 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -class CachingKeyValueStore extends WrappedStateStore> implements KeyValueStore, CachedStateStore { +class CachingKeyValueStore + extends WrappedStateStore, byte[], byte[]> + implements KeyValueStore, CachedStateStore { - private final Serde keySerde; - private final Serde valueSerde; - private CacheFlushListener flushListener; + private CacheFlushListener flushListener; private boolean sendOldValues; private String cacheName; private ThreadCache cache; private InternalProcessorContext context; - private StateSerdes serdes; private Thread streamThread; private final ReadWriteLock lock = new ReentrantReadWriteLock(); - CachingKeyValueStore(final KeyValueStore underlying, - final Serde keySerde, - final Serde valueSerde) { + CachingKeyValueStore(final KeyValueStore underlying) { super(underlying); - this.keySerde = keySerde; - this.valueSerde = valueSerde; } @Override @@ -68,9 +60,6 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); this.cache = this.context.getCache(); this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name()); @@ -90,9 +79,6 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, // this is an optimization: if this key did not exist in underlying store and also not in the cache, // we can skip flushing to downstream as well as writing to underlying store if (newValueBytes != null || oldValueBytes != null) { - final K key = serdes.keyFrom(entry.key().get()); - final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; - final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; // we need to get the old values if needed, and then put to store, and then flush wrapped().put(entry.key(), entry.newValue()); @@ -100,9 +86,9 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, context.setRecordContext(entry.entry().context()); try { flushListener.apply( - key, - newValue, - oldValue, + entry.key().get(), + newValueBytes, + sendOldValues ? oldValueBytes : null, entry.entry().context().timestamp()); } finally { context.setRecordContext(current); @@ -113,37 +99,93 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, } } - public void setFlushListener(final CacheFlushListener flushListener, - final boolean sendOldValues) { - + @Override + public boolean setFlushListener(final CacheFlushListener flushListener, + final boolean sendOldValues) { this.flushListener = flushListener; this.sendOldValues = sendOldValues; + + return true; } @Override - public void flush() { + public void put(final Bytes key, + final byte[] value) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); lock.writeLock().lock(); try { - cache.flush(cacheName); - super.flush(); + // for null bytes, we still put it into cache indicating tombstones + putInternal(key, value); } finally { lock.writeLock().unlock(); } } + private void putInternal(final Bytes key, + final byte[] value) { + cache.put( + cacheName, + key, + new LRUCacheEntry( + value, + context.headers(), + true, + context.offset(), + context.timestamp(), + context.partition(), + context.topic())); + } + @Override - public void close() { + public byte[] putIfAbsent(final Bytes key, + final byte[] value) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + lock.writeLock().lock(); try { - flush(); + final byte[] v = getInternal(key); + if (v == null) { + putInternal(key, value); + } + return v; } finally { - try { - super.close(); - } finally { - cache.close(cacheName); + lock.writeLock().unlock(); + } + } + + @Override + public void putAll(final List> entries) { + validateStoreOpen(); + lock.writeLock().lock(); + try { + for (final KeyValue entry : entries) { + Objects.requireNonNull(entry.key, "key cannot be null"); + put(entry.key, entry.value); } + } finally { + lock.writeLock().unlock(); } } + @Override + public byte[] delete(final Bytes key) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + lock.writeLock().lock(); + try { + return deleteInternal(key); + } finally { + lock.writeLock().unlock(); + } + } + + private byte[] deleteInternal(final Bytes key) { + final byte[] v = getInternal(key); + putInternal(key, null); + return v; + } + @Override public byte[] get(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); @@ -212,80 +254,26 @@ public long approximateNumEntries() { } @Override - public void put(final Bytes key, - final byte[] value) { - Objects.requireNonNull(key, "key cannot be null"); - validateStoreOpen(); + public void flush() { lock.writeLock().lock(); try { - // for null bytes, we still put it into cache indicating tombstones - putInternal(key, value); + cache.flush(cacheName); + super.flush(); } finally { lock.writeLock().unlock(); } } - private void putInternal(final Bytes key, - final byte[] value) { - cache.put( - cacheName, - key, - new LRUCacheEntry( - value, - context.headers(), - true, - context.offset(), - context.timestamp(), - context.partition(), - context.topic())); - } - @Override - public byte[] putIfAbsent(final Bytes key, - final byte[] value) { - Objects.requireNonNull(key, "key cannot be null"); - validateStoreOpen(); - lock.writeLock().lock(); + public void close() { try { - final byte[] v = getInternal(key); - if (v == null) { - putInternal(key, value); - } - return v; + flush(); } finally { - lock.writeLock().unlock(); - } - } - - @Override - public void putAll(final List> entries) { - validateStoreOpen(); - lock.writeLock().lock(); - try { - for (final KeyValue entry : entries) { - Objects.requireNonNull(entry.key, "key cannot be null"); - put(entry.key, entry.value); + try { + super.close(); + } finally { + cache.close(cacheName); } - } finally { - lock.writeLock().unlock(); } } - - @Override - public byte[] delete(final Bytes key) { - Objects.requireNonNull(key, "key cannot be null"); - validateStoreOpen(); - lock.writeLock().lock(); - try { - return deleteInternal(key); - } finally { - lock.writeLock().unlock(); - } - } - - private byte[] deleteInternal(final Bytes key) { - final byte[] v = getInternal(key); - putInternal(key, null); - return v; - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 67a1588a6b0ad..2fad28d3c2299 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -16,47 +16,38 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.Objects; -class CachingSessionStore extends WrappedStateStore> implements SessionStore, CachedStateStore, AGG> { +class CachingSessionStore + extends WrappedStateStore, byte[], byte[]> + implements SessionStore, CachedStateStore { private final SessionKeySchema keySchema; - private final Serde keySerde; - private final Serde aggSerde; private final SegmentedCacheFunction cacheFunction; private String cacheName; private ThreadCache cache; - private StateSerdes serdes; private InternalProcessorContext context; - private CacheFlushListener, AGG> flushListener; + private CacheFlushListener flushListener; private boolean sendOldValues; - private String topic; CachingSessionStore(final SessionStore bytesStore, - final Serde keySerde, - final Serde aggSerde, final long segmentInterval) { super(bytesStore); - this.keySerde = keySerde; - this.aggSerde = aggSerde; this.keySchema = new SessionKeySchema(); this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); } + @Override public void init(final ProcessorContext context, final StateStore root) { - topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()); initInternal((InternalProcessorContext) context); super.init(context, root); } @@ -65,11 +56,6 @@ public void init(final ProcessorContext context, final StateStore root) { private void initInternal(final InternalProcessorContext context) { this.context = context; - serdes = new StateSerdes<>( - topic, - keySerde == null ? (Serde) context.keySerde() : keySerde, - aggSerde == null ? (Serde) context.valueSerde() : aggSerde); - cacheName = context.taskId() + "-" + name(); cache = context.getCache(); cache.addDirtyEntryFlushListener(cacheName, entries -> { @@ -79,6 +65,69 @@ private void initInternal(final InternalProcessorContext context) { }); } + private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { + final Bytes binaryKey = cacheFunction.key(entry.key()); + final Windowed bytesKey = SessionKeySchema.from(binaryKey); + if (flushListener != null) { + final byte[] newValueBytes = entry.newValue(); + final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? + wrapped().fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end()) : null; + + // this is an optimization: if this key did not exist in underlying store and also not in the cache, + // we can skip flushing to downstream as well as writing to underlying store + if (newValueBytes != null || oldValueBytes != null) { + // we need to get the old values if needed, and then put to store, and then flush + wrapped().put(bytesKey, entry.newValue()); + + final ProcessorRecordContext current = context.recordContext(); + context.setRecordContext(entry.entry().context()); + try { + flushListener.apply( + binaryKey.get(), + newValueBytes, + sendOldValues ? oldValueBytes : null, + entry.entry().context().timestamp()); + } finally { + context.setRecordContext(current); + } + } + } else { + wrapped().put(bytesKey, entry.newValue()); + } + } + + @Override + public boolean setFlushListener(final CacheFlushListener flushListener, + final boolean sendOldValues) { + this.flushListener = flushListener; + this.sendOldValues = sendOldValues; + + return true; + } + + @Override + public void put(final Windowed key, final byte[] value) { + validateStoreOpen(); + final Bytes binaryKey = SessionKeySchema.toBinary(key); + final LRUCacheEntry entry = + new LRUCacheEntry( + value, + context.headers(), + true, + context.offset(), + context.timestamp(), + context.partition(), + context.topic()); + cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry); + } + + @Override + public void remove(final Windowed sessionKey) { + validateStoreOpen(); + put(sessionKey, null); + } + + @Override public KeyValueIterator, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { @@ -120,28 +169,6 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro return new MergedSortedCacheSessionStoreIterator(filteredCacheIterator, storeIterator, cacheFunction); } - @Override - public void remove(final Windowed sessionKey) { - validateStoreOpen(); - put(sessionKey, null); - } - - @Override - public void put(final Windowed key, final byte[] value) { - validateStoreOpen(); - final Bytes binaryKey = SessionKeySchema.toBinary(key); - final LRUCacheEntry entry = - new LRUCacheEntry( - value, - context.headers(), - true, - context.offset(), - context.timestamp(), - context.partition(), - context.topic()); - cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry); - } - @Override public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { Objects.requireNonNull(key, "key cannot be null"); @@ -173,40 +200,6 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final B return findSessions(from, to, 0, Long.MAX_VALUE); } - private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { - final Bytes binaryKey = cacheFunction.key(entry.key()); - final Windowed bytesKey = SessionKeySchema.from(binaryKey); - if (flushListener != null) { - final byte[] newValueBytes = entry.newValue(); - final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? - wrapped().fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end()) : null; - - // this is an optimization: if this key did not exist in underlying store and also not in the cache, - // we can skip flushing to downstream as well as writing to underlying store - if (newValueBytes != null || oldValueBytes != null) { - final Windowed key = SessionKeySchema.from(bytesKey, serdes.keyDeserializer(), topic); - final AGG newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; - final AGG oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; - // we need to get the old values if needed, and then put to store, and then flush - wrapped().put(bytesKey, entry.newValue()); - - final ProcessorRecordContext current = context.recordContext(); - context.setRecordContext(entry.entry().context()); - try { - flushListener.apply( - key, - newValue, - oldValue, - entry.entry().context().timestamp()); - } finally { - context.setRecordContext(current); - } - } - } else { - wrapped().put(bytesKey, entry.newValue()); - } - } - public void flush() { cache.flush(cacheName); super.flush(); @@ -217,11 +210,4 @@ public void close() { cache.close(cacheName); super.close(); } - - public void setFlushListener(final CacheFlushListener, AGG> flushListener, - final boolean sendOldValues) { - this.flushListener = flushListener; - this.sendOldValues = sendOldValues; - } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index bc82cc48f0376..6f8424b4b6f13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; @@ -30,32 +29,26 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -class CachingWindowStore extends WrappedStateStore> implements WindowStore, CachedStateStore, V> { +class CachingWindowStore + extends WrappedStateStore, byte[], byte[]> + implements WindowStore, CachedStateStore { - private final Serde keySerde; - private final Serde valueSerde; private final long windowSize; private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema(); - private String name; private ThreadCache cache; private boolean sendOldValues; - private StateSerdes serdes; private InternalProcessorContext context; private StateSerdes bytesSerdes; - private CacheFlushListener, V> flushListener; + private CacheFlushListener flushListener; private final SegmentedCacheFunction cacheFunction; CachingWindowStore(final WindowStore underlying, - final Serde keySerde, - final Serde valueSerde, final long windowSize, final long segmentInterval) { super(underlying); - this.keySerde = keySerde; - this.valueSerde = valueSerde; this.windowSize = windowSize; this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); } @@ -70,9 +63,6 @@ public void init(final ProcessorContext context, final StateStore root) { private void initInternal(final InternalProcessorContext context) { this.context = context; final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); - serdes = new StateSerdes<>(topic, - keySerde == null ? (Serde) context.keySerde() : keySerde, - valueSerde == null ? (Serde) context.valueSerde() : valueSerde); bytesSerdes = new StateSerdes<>(topic, Serdes.Bytes(), @@ -100,9 +90,6 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, // this is an optimization: if this key did not exist in underlying store and also not in the cache, // we can skip flushing to downstream as well as writing to underlying store if (newValueBytes != null || oldValueBytes != null) { - final Windowed windowedKey = WindowKeySchema.fromStoreKey(windowedKeyBytes, serdes.keyDeserializer(), serdes.topic()); - final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; - final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; // we need to get the old values if needed, and then put to store, and then flush wrapped().put(key, entry.newValue(), windowStartTimestamp); @@ -110,9 +97,9 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, context.setRecordContext(entry.entry().context()); try { flushListener.apply( - windowedKey, - newValue, - oldValue, + binaryWindowKey, + newValueBytes, + sendOldValues ? oldValueBytes : null, entry.entry().context().timestamp()); } finally { context.setRecordContext(current); @@ -123,24 +110,13 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, } } - public void setFlushListener(final CacheFlushListener, V> flushListener, - final boolean sendOldValues) { - + @Override + public boolean setFlushListener(final CacheFlushListener flushListener, + final boolean sendOldValues) { this.flushListener = flushListener; this.sendOldValues = sendOldValues; - } - @Override - public synchronized void flush() { - cache.flush(name); - wrapped().flush(); - } - - @Override - public void close() { - flush(); - cache.close(name); - wrapped().close(); + return true; } @Override @@ -239,22 +215,6 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final B ); } - @Override - public KeyValueIterator, byte[]> all() { - validateStoreOpen(); - - final KeyValueIterator, byte[]> underlyingIterator = wrapped().all(); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name); - - return new MergedSortedCacheWindowStoreKeyValueIterator( - cacheIterator, - underlyingIterator, - bytesSerdes, - windowSize, - cacheFunction - ); - } - @SuppressWarnings("deprecation") @Override public KeyValueIterator, byte[]> fetchAll(final long timeFrom, final long timeTo) { @@ -275,4 +235,33 @@ public KeyValueIterator, byte[]> fetchAll(final long timeFrom, f cacheFunction ); } + + @Override + public KeyValueIterator, byte[]> all() { + validateStoreOpen(); + + final KeyValueIterator, byte[]> underlyingIterator = wrapped().all(); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name); + + return new MergedSortedCacheWindowStoreKeyValueIterator( + cacheIterator, + underlyingIterator, + bytesSerdes, + windowSize, + cacheFunction + ); + } + + @Override + public synchronized void flush() { + cache.flush(name); + wrapped().flush(); + } + + @Override + public void close() { + flush(); + cache.close(name); + wrapped().close(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index aa931bf37b1a0..f4fda6aef9710 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -28,7 +28,10 @@ import java.util.List; -public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore> implements KeyValueStore { +public class ChangeLoggingKeyValueBytesStore + extends WrappedStateStore, byte[], byte[]> + implements KeyValueStore { + private StoreChangeLogger changeLogger; ChangeLoggingKeyValueBytesStore(final KeyValueStore inner) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 8fe8609de1ea5..361f8a53b071b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -30,7 +30,9 @@ * Simple wrapper around a {@link SessionStore} to support writing * updates to a changelog */ -class ChangeLoggingSessionBytesStore extends WrappedStateStore> implements SessionStore { +class ChangeLoggingSessionBytesStore + extends WrappedStateStore, byte[], byte[]> + implements SessionStore { private StoreChangeLogger changeLogger; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index 3cddb33930acf..7f7612e349a80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -31,7 +31,9 @@ * Simple wrapper around a {@link WindowStore} to support writing * updates to a changelog */ -class ChangeLoggingWindowBytesStore extends WrappedStateStore> implements WindowStore { +class ChangeLoggingWindowBytesStore + extends WrappedStateStore, byte[], byte[]> + implements WindowStore { private final boolean retainDuplicates; private StoreChangeLogger changeLogger; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java index 31169d20fe4b6..2071ca786292b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java @@ -51,7 +51,7 @@ private KeyValueStore maybeWrapCaching(final KeyValueStore(inner, keySerde, valueSerde); + return new CachingKeyValueStore(inner); } private KeyValueStore maybeWrapLogging(final KeyValueStore inner) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 0c0860604de51..c21568b6c5fff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -45,7 +45,9 @@ * @param * @param */ -public class MeteredKeyValueStore extends WrappedStateStore> implements KeyValueStore { +public class MeteredKeyValueStore + extends WrappedStateStore, K, V> + implements KeyValueStore { private final Serde keySerde; private final Serde valueSerde; @@ -115,15 +117,22 @@ public void init(final ProcessorContext context, } } + @SuppressWarnings("unchecked") @Override - public void close() { - super.close(); - metrics.removeAllStoreLevelSensors(taskName, name()); - } - - @Override - public long approximateNumEntries() { - return wrapped().approximateNumEntries(); + public boolean setFlushListener(final CacheFlushListener listener, + final boolean sendOldValues) { + final KeyValueStore wrapped = wrapped(); + if (wrapped instanceof CachedStateStore) { + return ((CachedStateStore) wrapped).setFlushListener( + (key, newValue, oldValue, timestamp) -> listener.apply( + serdes.keyFrom(key), + newValue != null ? serdes.valueFrom(newValue) : null, + oldValue != null ? serdes.valueFrom(oldValue) : null, + timestamp + ), + sendOldValues); + } + return false; } @Override @@ -225,6 +234,17 @@ public void flush() { } } + @Override + public long approximateNumEntries() { + return wrapped().approximateNumEntries(); + } + + @Override + public void close() { + super.close(); + metrics.removeAllStoreLevelSensors(taskName, name()); + } + private interface Action { V execute(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 0db67c3bcc72a..4631601b12a9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -36,7 +36,10 @@ import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors; -public class MeteredSessionStore extends WrappedStateStore> implements SessionStore { +public class MeteredSessionStore + extends WrappedStateStore, Windowed, V> + implements SessionStore { + private final String metricScope; private final Serde keySerde; private final Serde valueSerde; @@ -96,86 +99,55 @@ public void init(final ProcessorContext context, } } + @SuppressWarnings("unchecked") @Override - public void close() { - super.close(); - metrics.removeAllStoreLevelSensors(taskName, name()); - } - - - @Override - public KeyValueIterator, V> findSessions(final K key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { - Objects.requireNonNull(key, "key cannot be null"); - final Bytes bytesKey = keyBytes(key); - return new MeteredWindowedKeyValueIterator<>( - wrapped().findSessions( - bytesKey, - earliestSessionEndTime, - latestSessionStartTime), - fetchTime, - metrics, - serdes, - time); - } - - @Override - public KeyValueIterator, V> findSessions(final K keyFrom, - final K keyTo, - final long earliestSessionEndTime, - final long latestSessionStartTime) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); - final Bytes bytesKeyFrom = keyBytes(keyFrom); - final Bytes bytesKeyTo = keyBytes(keyTo); - return new MeteredWindowedKeyValueIterator<>( - wrapped().findSessions( - bytesKeyFrom, - bytesKeyTo, - earliestSessionEndTime, - latestSessionStartTime), - fetchTime, - metrics, - serdes, - time); + public boolean setFlushListener(final CacheFlushListener, V> listener, + final boolean sendOldValues) { + final SessionStore wrapped = wrapped(); + if (wrapped instanceof CachedStateStore) { + return ((CachedStateStore) wrapped).setFlushListener( + (key, newValue, oldValue, timestamp) -> listener.apply( + SessionKeySchema.from(key, serdes.keyDeserializer(), serdes.topic()), + newValue != null ? serdes.valueFrom(newValue) : null, + oldValue != null ? serdes.valueFrom(oldValue) : null, + timestamp + ), + sendOldValues); + } + return false; } @Override - public void remove(final Windowed sessionKey) { + public void put(final Windowed sessionKey, + final V aggregate) { Objects.requireNonNull(sessionKey, "sessionKey can't be null"); final long startNs = time.nanoseconds(); try { final Bytes key = keyBytes(sessionKey.key()); - wrapped().remove(new Windowed<>(key, sessionKey.window())); + wrapped().put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate)); } catch (final ProcessorStateException e) { - final String message = String.format(e.getMessage(), sessionKey.key()); + final String message = String.format(e.getMessage(), sessionKey.key(), aggregate); throw new ProcessorStateException(message, e); } finally { - metrics.recordLatency(removeTime, startNs, time.nanoseconds()); + metrics.recordLatency(putTime, startNs, time.nanoseconds()); } } @Override - public void put(final Windowed sessionKey, - final V aggregate) { + public void remove(final Windowed sessionKey) { Objects.requireNonNull(sessionKey, "sessionKey can't be null"); final long startNs = time.nanoseconds(); try { final Bytes key = keyBytes(sessionKey.key()); - wrapped().put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate)); + wrapped().remove(new Windowed<>(key, sessionKey.window())); } catch (final ProcessorStateException e) { - final String message = String.format(e.getMessage(), sessionKey.key(), aggregate); + final String message = String.format(e.getMessage(), sessionKey.key()); throw new ProcessorStateException(message, e); } finally { - metrics.recordLatency(putTime, startNs, time.nanoseconds()); + metrics.recordLatency(removeTime, startNs, time.nanoseconds()); } } - private Bytes keyBytes(final K key) { - return Bytes.wrap(serdes.rawKey(key)); - } - @Override public V fetchSession(final K key, final long startTime, final long endTime) { Objects.requireNonNull(key, "key cannot be null"); @@ -205,6 +177,44 @@ public KeyValueIterator, V> fetch(final K from, return findSessions(from, to, 0, Long.MAX_VALUE); } + @Override + public KeyValueIterator, V> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(key, "key cannot be null"); + final Bytes bytesKey = keyBytes(key); + return new MeteredWindowedKeyValueIterator<>( + wrapped().findSessions( + bytesKey, + earliestSessionEndTime, + latestSessionStartTime), + fetchTime, + metrics, + serdes, + time); + } + + @Override + public KeyValueIterator, V> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); + Objects.requireNonNull(keyTo, "keyTo cannot be null"); + final Bytes bytesKeyFrom = keyBytes(keyFrom); + final Bytes bytesKeyTo = keyBytes(keyTo); + return new MeteredWindowedKeyValueIterator<>( + wrapped().findSessions( + bytesKeyFrom, + bytesKeyTo, + earliestSessionEndTime, + latestSessionStartTime), + fetchTime, + metrics, + serdes, + time); + } + @Override public void flush() { final long startNs = time.nanoseconds(); @@ -214,4 +224,14 @@ public void flush() { metrics.recordLatency(flushTime, startNs, time.nanoseconds()); } } + + @Override + public void close() { + super.close(); + metrics.removeAllStoreLevelSensors(taskName, name()); + } + + private Bytes keyBytes(final K key) { + return Bytes.wrap(serdes.rawKey(key)); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 2bbfc4524cca5..c040e672399f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -36,8 +36,11 @@ import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors; -public class MeteredWindowStore extends WrappedStateStore> implements WindowStore { +public class MeteredWindowStore + extends WrappedStateStore, Windowed, V> + implements WindowStore { + private final long windowSizeMs; private final String metricScope; private final Time time; private final Serde keySerde; @@ -51,11 +54,13 @@ public class MeteredWindowStore extends WrappedStateStore inner, + final long windowSizeMs, final String metricScope, final Time time, final Serde keySerde, final Serde valueSerde) { super(inner); + this.windowSizeMs = windowSizeMs; this.metricScope = metricScope; this.time = time; this.keySerde = keySerde; @@ -96,10 +101,22 @@ public void init(final ProcessorContext context, } } + @SuppressWarnings("unchecked") @Override - public void close() { - super.close(); - metrics.removeAllStoreLevelSensors(taskName, name()); + public boolean setFlushListener(final CacheFlushListener, V> listener, + final boolean sendOldValues) { + final WindowStore wrapped = wrapped(); + if (wrapped instanceof CachedStateStore) { + return ((CachedStateStore) wrapped).setFlushListener( + (key, newValue, oldValue, timestamp) -> listener.apply( + WindowKeySchema.fromStoreKey(key, windowSizeMs, serdes.keyDeserializer(), serdes.topic()), + newValue != null ? serdes.valueFrom(newValue) : null, + oldValue != null ? serdes.valueFrom(oldValue) : null, + timestamp + ), + sendOldValues); + } + return false; } @Override @@ -123,10 +140,6 @@ public void put(final K key, } } - private Bytes keyBytes(final K key) { - return Bytes.wrap(serdes.rawKey(key)); - } - @Override public V fetch(final K key, final long timestamp) { @@ -154,17 +167,14 @@ public WindowStoreIterator fetch(final K key, time); } - @Override - public KeyValueIterator, V> all() { - return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchTime, metrics, serdes, time); - } - @SuppressWarnings("deprecation") @Override - public KeyValueIterator, V> fetchAll(final long timeFrom, - final long timeTo) { + public KeyValueIterator, V> fetch(final K from, + final K to, + final long timeFrom, + final long timeTo) { return new MeteredWindowedKeyValueIterator<>( - wrapped().fetchAll(timeFrom, timeTo), + wrapped().fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo), fetchTime, metrics, serdes, @@ -173,18 +183,21 @@ public KeyValueIterator, V> fetchAll(final long timeFrom, @SuppressWarnings("deprecation") @Override - public KeyValueIterator, V> fetch(final K from, - final K to, - final long timeFrom, - final long timeTo) { + public KeyValueIterator, V> fetchAll(final long timeFrom, + final long timeTo) { return new MeteredWindowedKeyValueIterator<>( - wrapped().fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo), + wrapped().fetchAll(timeFrom, timeTo), fetchTime, metrics, serdes, time); } + @Override + public KeyValueIterator, V> all() { + return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchTime, metrics, serdes, time); + } + @Override public void flush() { final long startNs = time.nanoseconds(); @@ -195,4 +208,13 @@ public void flush() { } } + @Override + public void close() { + super.close(); + metrics.removeAllStoreLevelSensors(taskName, name()); + } + + private Bytes keyBytes(final K key) { + return Bytes.wrap(serdes.rawKey(key)); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index c9ca423cbbe3d..2f7a211d5266a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -22,7 +22,9 @@ import org.apache.kafka.streams.state.SessionStore; -public class RocksDBSessionStore extends WrappedStateStore implements SessionStore { +public class RocksDBSessionStore + extends WrappedStateStore + implements SessionStore { RocksDBSessionStore(final SegmentedBytesStore bytesStore) { super(bytesStore); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 44c9f79a98057..e621290f20605 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -24,7 +24,9 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -public class RocksDBWindowStore extends WrappedStateStore implements WindowStore { +public class RocksDBWindowStore + extends WrappedStateStore + implements WindowStore { private final boolean retainDuplicates; private final long windowSize; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java index b4338952a667a..a40eab3a86913 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java @@ -48,10 +48,9 @@ private SessionStore maybeWrapCaching(final SessionStore(inner, - keySerde, - valueSerde, - storeSupplier.segmentIntervalMs()); + return new CachingSessionStore( + inner, + storeSupplier.segmentIntervalMs()); } private SessionStore maybeWrapLogging(final SessionStore inner) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index 058a2498dc5c1..ea30d69bd03d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -38,6 +38,7 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, public WindowStore build() { return new MeteredWindowStore<>( maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), + storeSupplier.windowSize(), storeSupplier.metricsScope(), time, keySerde, @@ -48,10 +49,8 @@ private WindowStore maybeWrapCaching(final WindowStore( + return new CachingWindowStore( inner, - keySerde, - valueSerde, storeSupplier.windowSize(), storeSupplier.segmentIntervalMs()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index 65cd48411e0f3..62dc8e059d260 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -24,7 +24,8 @@ /** * A storage engine wrapper for utilities like logging, caching, and metering. */ -public abstract class WrappedStateStore implements StateStore { +public abstract class WrappedStateStore implements StateStore, CachedStateStore { + public static boolean isTimestamped(final StateStore stateStore) { if (stateStore instanceof TimestampedBytesStore) { return true; @@ -47,6 +48,16 @@ public void init(final ProcessorContext context, wrapped.init(context, root); } + @SuppressWarnings("unchecked") + @Override + public boolean setFlushListener(final CacheFlushListener listener, + final boolean sendOldValues) { + if (wrapped instanceof CachedStateStore) { + return ((CachedStateStore) wrapped).setFlushListener(listener, sendOldValues); + } + return false; + } + @Override public String name() { return wrapped.name(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 0452c0672bf5a..2e4f318cc4112 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -22,17 +22,10 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; @@ -45,6 +38,7 @@ import org.junit.Test; import java.io.File; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -76,14 +70,17 @@ public void testAggBasic() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; - final KTable table1 = builder.table(topic1, consumed); - final KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), - stringSerialzied - ).aggregate(MockInitializer.STRING_INIT, + final KTable table2 = table1 + .groupBy( + MockMapper.noOpKeyValueMapper(), + stringSerialzied) + .aggregate( + MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, - Materialized.>as("topic1-Canonized").withValueSerde(stringSerde)); + Materialized.>as("topic1-Canonized") + .withValueSerde(stringSerde)); table2.toStream().process(supplier); @@ -106,7 +103,8 @@ public void testAggBasic() { driver.process(topic1, "C", "8"); driver.flushState(); - assertEquals(asList( + assertEquals( + asList( "A:0+1", "B:0+2", "A:0+1-1+3", @@ -114,7 +112,8 @@ public void testAggBasic() { "C:0+5", "D:0+6", "B:0+2-2+4-4+7", - "C:0+5-5+8"), supplier.theCapturedProcessor().processed); + "C:0+5-5+8"), + supplier.theCapturedProcessor().processed); } @@ -124,12 +123,15 @@ public void testAggCoalesced() { final String topic1 = "topic1"; final KTable table1 = builder.table(topic1, consumed); - final KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), - stringSerialzied - ).aggregate(MockInitializer.STRING_INIT, - MockAggregator.TOSTRING_ADDER, - MockAggregator.TOSTRING_REMOVER, - Materialized.>as("topic1-Canonized").withValueSerde(stringSerde)); + final KTable table2 = table1 + .groupBy( + MockMapper.noOpKeyValueMapper(), + stringSerialzied) + .aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + MockAggregator.TOSTRING_REMOVER, + Materialized.>as("topic1-Canonized") + .withValueSerde(stringSerde)); table2.toStream().process(supplier); @@ -139,8 +141,8 @@ public void testAggCoalesced() { driver.process(topic1, "A", "3"); driver.process(topic1, "A", "4"); driver.flushState(); - assertEquals(asList( - "A:0+4"), supplier.theCapturedProcessor().processed); + + assertEquals(Collections.singletonList("A:0+4"), supplier.theCapturedProcessor().processed); } @@ -150,10 +152,9 @@ public void testAggRepartition() { final String topic1 = "topic1"; final KTable table1 = builder.table(topic1, consumed); - final KTable table2 = table1.groupBy( - new KeyValueMapper>() { - @Override - public KeyValue apply(final String key, final String value) { + final KTable table2 = table1 + .groupBy( + (key, value) -> { switch (key) { case "null": return KeyValue.pair(null, value); @@ -162,14 +163,14 @@ public KeyValue apply(final String key, final String value) { default: return KeyValue.pair(value, value); } - } - }, - stringSerialzied - ) - .aggregate(MockInitializer.STRING_INIT, + }, + stringSerialzied) + .aggregate( + MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, - Materialized.>as("topic1-Canonized").withValueSerde(stringSerde)); + Materialized.>as("topic1-Canonized") + .withValueSerde(stringSerde)); table2.toStream().process(supplier); @@ -192,7 +193,8 @@ public KeyValue apply(final String key, final String value) { driver.process(topic1, "B", "7"); driver.flushState(); - assertEquals(asList( + assertEquals( + asList( "1:0+1", "1:0+1-1", "1:0+1-1+1", @@ -200,11 +202,13 @@ public KeyValue apply(final String key, final String value) { //noop "2:0+2-2", "4:0+4", //noop - "4:0+4-4", "7:0+7" - ), supplier.theCapturedProcessor().processed); + "4:0+4-4", "7:0+7"), + supplier.theCapturedProcessor().processed); } - private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier supplier) { + private void testCountHelper(final StreamsBuilder builder, + final String input, + final MockProcessorSupplier supplier) { driver.setUp(builder, stateDir); driver.process(input, "A", "green"); @@ -219,14 +223,14 @@ private void testCountHelper(final StreamsBuilder builder, final String input, f driver.flushState(); driver.flushState(); - - assertEquals(asList( - "green:1", - "green:2", - "green:1", "blue:1", - "yellow:1", - "green:2" - ), supplier.theCapturedProcessor().processed); + assertEquals( + asList( + "green:1", + "green:2", + "green:1", "blue:1", + "yellow:1", + "green:2"), + supplier.theCapturedProcessor().processed); } @Test @@ -234,11 +238,12 @@ public void testCount() { final StreamsBuilder builder = new StreamsBuilder(); final String input = "count-test-input"; - builder.table(input, consumed) - .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) - .count(Materialized.>as("count")) - .toStream() - .process(supplier); + builder + .table(input, consumed) + .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) + .count(Materialized.as("count")) + .toStream() + .process(supplier); testCountHelper(builder, input, supplier); } @@ -248,8 +253,9 @@ public void testCountWithInternalStore() { final StreamsBuilder builder = new StreamsBuilder(); final String input = "count-test-input"; - builder.table(input, consumed) - .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) + builder + .table(input, consumed) + .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) .count() .toStream() .process(supplier); @@ -263,9 +269,10 @@ public void testCountCoalesced() { final String input = "count-test-input"; final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - builder.table(input, consumed) - .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) - .count(Materialized.>as("count")) + builder + .table(input, consumed) + .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) + .count(Materialized.as("count")) .toStream() .process(supplier); @@ -280,12 +287,12 @@ public void testCountCoalesced() { driver.process(input, "D", "green"); driver.flushState(); - - assertEquals(asList( - "blue:1", - "yellow:1", - "green:2" - ), proc.processed); + assertEquals( + asList( + "blue:1", + "yellow:1", + "green:2"), + proc.processed); } @Test @@ -294,35 +301,21 @@ public void testRemoveOldBeforeAddNew() { final String input = "count-test-input"; final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - builder.table(input, consumed) - .groupBy(new KeyValueMapper>() { - - @Override - public KeyValue apply(final String key, final String value) { - return KeyValue.pair(String.valueOf(key.charAt(0)), String.valueOf(key.charAt(1))); - } - }, stringSerialzied) - .aggregate(new Initializer() { - - @Override - public String apply() { - return ""; - } - }, new Aggregator() { - - @Override - public String apply(final String aggKey, final String value, final String aggregate) { - return aggregate + value; - } - }, new Aggregator() { - - @Override - public String apply(final String key, final String value, final String aggregate) { - return aggregate.replaceAll(value, ""); - } - }, Materialized.>as("someStore").withValueSerde(Serdes.String())) - .toStream() - .process(supplier); + builder + .table(input, consumed) + .groupBy( + (key, value) -> KeyValue.pair( + String.valueOf(key.charAt(0)), + String.valueOf(key.charAt(1))), + stringSerialzied) + .aggregate( + () -> "", + (aggKey, value, aggregate) -> aggregate + value, + (key, value, aggregate) -> aggregate.replaceAll(value, ""), + Materialized.>as("someStore") + .withValueSerde(Serdes.String())) + .toStream() + .process(supplier); driver.setUp(builder, stateDir); @@ -337,12 +330,13 @@ public String apply(final String key, final String value, final String aggregate driver.process(input, "12", "C"); driver.flushState(); - assertEquals(asList( - "1:1", - "1:12", - "1:2", - "1:2" - ), proc.processed); + assertEquals( + asList( + "1:1", + "1:12", + "1:2", + "1:2"), + proc.processed); } @Test @@ -356,44 +350,19 @@ public void shouldForwardToCorrectProcessorNodeWhenMultiCacheEvictions() { final KTable one = builder.table(tableOne, consumed); final KTable two = builder.table(tableTwo, Consumed.with(Serdes.Long(), Serdes.String())); + final KTable reduce = two + .groupBy( + (key, value) -> new KeyValue<>(value, key), + Grouped.with(Serdes.String(), Serdes.Long())) + .reduce( + (value1, value2) -> value1 + value2, + (value1, value2) -> value1 - value2, + Materialized.as("reducer-store")); - final KTable reduce = two.groupBy(new KeyValueMapper>() { - @Override - public KeyValue apply(final Long key, final String value) { - return new KeyValue<>(value, key); - } - }, Grouped.with(Serdes.String(), Serdes.Long())) - .reduce(new Reducer() { - @Override - public Long apply(final Long value1, final Long value2) { - return value1 + value2; - } - }, new Reducer() { - @Override - public Long apply(final Long value1, final Long value2) { - return value1 - value2; - } - }, Materialized.>as("reducer-store")); - - reduce.toStream().foreach(new ForeachAction() { - @Override - public void apply(final String key, final Long value) { - reduceResults.put(key, value); - } - }); - - one.leftJoin(reduce, new ValueJoiner() { - @Override - public String apply(final String value1, final Long value2) { - return value1 + ":" + value2; - } - }) - .mapValues(new ValueMapper() { - @Override - public String apply(final String value) { - return value; - } - }); + reduce.toStream().foreach(reduceResults::put); + + one.leftJoin(reduce, (value1, value2) -> value1 + ":" + value2) + .mapValues(value -> value); driver.setUp(builder, stateDir, 111); driver.process(reduceTopic, "1", new Change<>(1L, null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java new file mode 100644 index 0000000000000..68c6eaa7870ef --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.internals.WrappedStateStore; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class TupleForwarderTest { + + @Test + public void shouldSetFlushListenerOnWrappedStateStore() { + setFlushListener(true); + setFlushListener(false); + } + + private void setFlushListener(final boolean sendOldValues) { + final WrappedStateStore store = mock(WrappedStateStore.class); + final ForwardingCacheFlushListener flushListener = mock(ForwardingCacheFlushListener.class); + + expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false); + replay(store); + + new TupleForwarder<>(store, null, flushListener, sendOldValues); + + verify(store); + } + + @Test + public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() { + final WrappedStateStore store = mock(WrappedStateStore.class); + final ProcessorContext context = mock(ProcessorContext.class); + + expect(store.setFlushListener(null, false)).andReturn(false); + context.forward("key", new Change<>("value", "oldValue")); + expectLastCall(); + replay(store, context); + + new TupleForwarder<>(store, context, null, false) + .maybeForward("key", "value", "oldValue"); + + verify(store, context); + } + + @Test + public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() { + final WrappedStateStore store = mock(WrappedStateStore.class); + final ProcessorContext context = mock(ProcessorContext.class); + + expect(store.setFlushListener(null, false)).andReturn(true); + replay(store, context); + + new TupleForwarder<>(store, context, null, false) + .maybeForward("key", "value", "oldValue"); + + verify(store, context); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 5dd5f8651cf9a..c74ad1bb77cbf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -239,7 +239,7 @@ public void shouldNotConvertValuesIfInnerStoreDoesNotImplementTimestampedBytesSt stateManager.initialize(); stateManager.register( - new WrappedStateStore>(store1) { + new WrappedStateStore, Object, Object>(store1) { }, stateRestoreCallback ); @@ -267,7 +267,7 @@ public void shouldConvertValuesIfInnerStoreImplementsTimestampedBytesStore() { stateManager.initialize(); stateManager.register( - new WrappedStateStore>(store2) { + new WrappedStateStore, Object, Object>(store2) { }, stateRestoreCallback ); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index ad87b608aa803..f2f6b8861802c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -311,12 +311,12 @@ public void shouldNotThrowNullPointerExceptionOnPutIfAbsentNullValue() { @Test(expected = NullPointerException.class) public void shouldThrowNullPointerExceptionOnPutAllNullKey() { - store.putAll(Collections.singletonList(new KeyValue(null, "anyValue"))); + store.putAll(Collections.singletonList(new KeyValue<>(null, "anyValue"))); } @Test public void shouldNotThrowNullPointerExceptionOnPutAllNullKey() { - store.putAll(Collections.singletonList(new KeyValue(1, null))); + store.putAll(Collections.singletonList(new KeyValue<>(1, null))); } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 6c2b7cf4f2b1c..d0a501f24bf7f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -17,8 +17,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; @@ -27,7 +28,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -51,13 +51,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { private final int maxCacheSizeBytes = 150; private InternalMockProcessorContext context; - private CachingKeyValueStore store; + private CachingKeyValueStore store; private InMemoryKeyValueStore underlyingStore; private ThreadCache cache; private CacheFlushListenerStub cacheFlushListener; @@ -67,14 +68,13 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { public void setUp() { final String storeName = "store"; underlyingStore = new InMemoryKeyValueStore(storeName); - cacheFlushListener = new CacheFlushListenerStub<>(); - store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); + cacheFlushListener = new CacheFlushListenerStub<>(new StringDeserializer(), new StringDeserializer()); + store = new CachingKeyValueStore(underlyingStore); store.setFlushListener(cacheFlushListener, false); cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); - context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache); + context = new InternalMockProcessorContext(null, null, null, null, cache); topic = "topic"; - context.setRecordContext( - new ProcessorRecordContext(10, 0, 0, topic, null)); + context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null)); store.init(context, null); } @@ -93,14 +93,16 @@ protected KeyValueStore createKeyValueStore(final ProcessorContext .withCachingEnabled(); final KeyValueStore store = (KeyValueStore) storeBuilder.build(); - final CacheFlushListenerStub cacheFlushListener = new CacheFlushListenerStub<>(); - - final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrapped(); - inner.setFlushListener(cacheFlushListener, false); store.init(context, store); return store; } + @Test + public void shouldSetFlushListener() { + assertTrue(store.setFlushListener(null, true)); + assertTrue(store.setFlushListener(null, false)); + } + @Test public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() { final int added = addItemsToCache(); @@ -122,7 +124,7 @@ public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() { public void shouldCloseAfterErrorWithFlush() { try { cache = EasyMock.niceMock(ThreadCache.class); - context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache); + context = new InternalMockProcessorContext(null, null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null)); store.init(context, null); cache.flush("0_0-store"); @@ -328,11 +330,11 @@ public void shouldThrowNullPointerExceptionOnPutIfAbsentWithNullKey() { @Test public void shouldThrowNullPointerExceptionOnPutAllWithNullKey() { final List> entries = new ArrayList<>(); - entries.add(new KeyValue(null, bytesValue("a"))); + entries.add(new KeyValue<>(null, bytesValue("a"))); try { store.putAll(entries); fail("Should have thrown NullPointerException while putAll null key"); - } catch (final NullPointerException e) { + } catch (final NullPointerException expected) { } } @@ -377,15 +379,27 @@ private int addItemsToCache() { return i; } - public static class CacheFlushListenerStub implements CacheFlushListener { + public static class CacheFlushListenerStub implements CacheFlushListener { + final Deserializer keyDeserializer; + final Deserializer valueDesializer; final Map> forwarded = new HashMap<>(); + CacheFlushListenerStub(final Deserializer keyDeserializer, + final Deserializer valueDesializer) { + this.keyDeserializer = keyDeserializer; + this.valueDesializer = valueDesializer; + } + @Override - public void apply(final K key, - final V newValue, - final V oldValue, + public void apply(final byte[] key, + final byte[] newValue, + final byte[] oldValue, final long timestamp) { - forwarded.put(key, new Change<>(newValue, oldValue)); + forwarded.put( + keyDeserializer.deserialize(null, key), + new Change<>( + valueDesializer.deserialize(null, newValue), + valueDesializer.deserialize(null, oldValue))); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 1cfdcd7202d44..a6b169fad4adc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -17,11 +17,12 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionWindow; @@ -43,16 +44,19 @@ import java.util.Random; import java.util.Set; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; @SuppressWarnings("PointlessArithmeticExpression") public class CachingSessionStoreTest { @@ -64,7 +68,7 @@ public class CachingSessionStoreTest { private final Bytes keyAA = Bytes.wrap("aa".getBytes()); private final Bytes keyB = Bytes.wrap("b".getBytes()); - private CachingSessionStore cachingStore; + private CachingSessionStore cachingStore; private ThreadCache cache; @Before @@ -72,7 +76,7 @@ public void setUp() { final SessionKeySchema schema = new SessionKeySchema(); final RocksDBSegmentedBytesStore underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema); final RocksDBSessionStore sessionStore = new RocksDBSessionStore(underlying); - cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL); + cachingStore = new CachingSessionStore(sessionStore, SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null)); @@ -228,53 +232,59 @@ public void shouldFetchRangeCorrectlyAcrossSegments() { assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys); } + @Test + public void shouldSetFlushListener() { + assertTrue(cachingStore.setFlushListener(null, true)); + assertTrue(cachingStore.setFlushListener(null, false)); + } + @Test public void shouldForwardChangedValuesDuringFlush() { final Windowed a = new Windowed<>(keyA, new SessionWindow(2, 4)); final Windowed b = new Windowed<>(keyA, new SessionWindow(1, 2)); final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(2, 4)); final Windowed bDeserialized = new Windowed<>("a", new SessionWindow(1, 2)); - final List, Change>> flushed = new ArrayList<>(); - cachingStore.setFlushListener( - (key, newValue, oldValue, timestamp) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), - true - ); + final CachingKeyValueStoreTest.CacheFlushListenerStub, String> flushListener = + new CachingKeyValueStoreTest.CacheFlushListenerStub<>( + new SessionWindowedDeserializer<>(new StringDeserializer()), + new StringDeserializer()); + cachingStore.setFlushListener(flushListener, true); cachingStore.put(b, "1".getBytes()); cachingStore.flush(); assertEquals( - Collections.singletonList(KeyValue.pair(bDeserialized, new Change<>("1", null))), - flushed + Collections.singletonMap(bDeserialized, new Change<>("1", null)), + flushListener.forwarded ); - flushed.clear(); + flushListener.forwarded.clear(); cachingStore.put(a, "1".getBytes()); cachingStore.flush(); assertEquals( - Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("1", null))), - flushed + Collections.singletonMap(aDeserialized, new Change<>("1", null)), + flushListener.forwarded ); - flushed.clear(); + flushListener.forwarded.clear(); cachingStore.put(a, "2".getBytes()); cachingStore.flush(); assertEquals( - Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>("2", "1"))), - flushed + Collections.singletonMap(aDeserialized, new Change<>("2", "1")), + flushListener.forwarded ); - flushed.clear(); + flushListener.forwarded.clear(); cachingStore.remove(a); cachingStore.flush(); assertEquals( - Collections.singletonList(KeyValue.pair(aDeserialized, new Change<>(null, "2"))), - flushed + Collections.singletonMap(aDeserialized, new Change<>(null, "2")), + flushListener.forwarded ); - flushed.clear(); + flushListener.forwarded.clear(); cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); @@ -282,21 +292,21 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.flush(); assertEquals( - Collections.emptyList(), - flushed + Collections.emptyMap(), + flushListener.forwarded ); - flushed.clear(); + flushListener.forwarded.clear(); } @Test public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() { final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); - final List, Change>> flushed = new ArrayList<>(); - cachingStore.setFlushListener( - (key, newValue, oldValue, timestamp) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), - false - ); + final CachingKeyValueStoreTest.CacheFlushListenerStub, String> flushListener = + new CachingKeyValueStoreTest.CacheFlushListenerStub<>( + new SessionWindowedDeserializer<>(new StringDeserializer()), + new StringDeserializer()); + cachingStore.setFlushListener(flushListener, false); cachingStore.put(a, "1".getBytes()); cachingStore.flush(); @@ -308,14 +318,14 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.flush(); assertEquals( - Arrays.asList( - KeyValue.pair(aDeserialized, new Change<>("1", null)), - KeyValue.pair(aDeserialized, new Change<>("2", null)), - KeyValue.pair(aDeserialized, new Change<>(null, null)) + mkMap( + mkEntry(aDeserialized, new Change<>("1", null)), + mkEntry(aDeserialized, new Change<>("2", null)), + mkEntry(aDeserialized, new Change<>(null, null)) ), - flushed + flushListener.forwarded ); - flushed.clear(); + flushListener.forwarded.clear(); cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); @@ -323,10 +333,10 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.flush(); assertEquals( - Collections.emptyList(), - flushed + Collections.emptyMap(), + flushListener.forwarded ); - flushed.clear(); + flushListener.forwarded.clear(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 2bb758eab38a9..da49dded4c7f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; @@ -27,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; @@ -64,6 +66,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class CachingWindowStoreTest { @@ -73,7 +76,7 @@ public class CachingWindowStoreTest { private static final long SEGMENT_INTERVAL = 100L; private InternalMockProcessorContext context; private RocksDBSegmentedBytesStore underlying; - private CachingWindowStore cachingStore; + private CachingWindowStore cachingStore; private CachingKeyValueStoreTest.CacheFlushListenerStub, String> cacheListener; private ThreadCache cache; private String topic; @@ -87,8 +90,10 @@ public void setUp() { underlying, false, WINDOW_SIZE); - cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(); - cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE, SEGMENT_INTERVAL); + final TimeWindowedDeserializer keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE); + keyDeserializer.setIsChangelogTopic(true); + cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(keyDeserializer, new StringDeserializer()); + cachingStore = new CachingWindowStore(windowStore, WINDOW_SIZE, SEGMENT_INTERVAL); cachingStore.setFlushListener(cacheListener, false); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); topic = "topic"; @@ -337,6 +342,12 @@ public void shouldForwardDirtyItemsWhenFlushCalled() { assertNull(cacheListener.forwarded.get(windowedKey).oldValue); } + @Test + public void shouldSetFlushListener() { + assertTrue(cachingStore.setFlushListener(null, true)); + assertTrue(cachingStore.setFlushListener(null, false)); + } + @Test public void shouldForwardOldValuesWhenEnabled() { cachingStore.setFlushListener(cacheListener, true); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 23effc93e85de..f4d3d76566155 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -46,6 +46,12 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -105,7 +111,7 @@ public void testMetrics() { @Test public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { - inner.put(EasyMock.eq(keyBytes), EasyMock.aryEq(valueBytes)); + inner.put(eq(keyBytes), EasyMock.aryEq(valueBytes)); EasyMock.expectLastCall(); init(); @@ -133,7 +139,7 @@ public void shouldGetBytesFromInnerStoreAndReturnGetMetric() { @Test public void shouldPutIfAbsentAndRecordPutIfAbsentMetric() { - EasyMock.expect(inner.putIfAbsent(EasyMock.eq(keyBytes), EasyMock.aryEq(valueBytes))) + EasyMock.expect(inner.putIfAbsent(eq(keyBytes), EasyMock.aryEq(valueBytes))) .andReturn(null); init(); @@ -223,6 +229,32 @@ public void shouldFlushInnerWhenFlushTimeRecords() { EasyMock.verify(inner); } + private interface CachedKeyValueStore extends KeyValueStore, CachedStateStore { } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetFlushListenerOnWrappedCachingStore() { + final CachedKeyValueStore cachedKeyValueStore = mock(CachedKeyValueStore.class); + + expect(cachedKeyValueStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true); + replay(cachedKeyValueStore); + + metered = new MeteredKeyValueStore<>( + cachedKeyValueStore, + "scope", + new MockTime(), + Serdes.String(), + Serdes.String() + ); + assertTrue(metered.setFlushListener(null, false)); + + verify(cachedKeyValueStore); + } + + @Test + public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { + assertFalse(metered.setFlushListener(null, false)); + } private KafkaMetric metric(final MetricName metricName) { return this.metrics.metric(metricName); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index fd0496804ea69..7dbf192dee95e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.test.KeyValueIteratorStub; -import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; @@ -47,6 +46,14 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.aryEq; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -72,22 +79,22 @@ public class MeteredSessionStoreTest { private final byte[] keyBytes = key.getBytes(); private final Windowed windowedKeyBytes = new Windowed<>(Bytes.wrap(keyBytes), new SessionWindow(0, 0)); - @Before public void before() { - metered = new MeteredSessionStore<>(inner, - "scope", - Serdes.String(), - Serdes.String(), - new MockTime()); + metered = new MeteredSessionStore<>( + inner, + "scope", + Serdes.String(), + Serdes.String(), + new MockTime()); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); - EasyMock.expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)); - EasyMock.expect(context.taskId()).andReturn(taskId); - EasyMock.expect(inner.name()).andReturn("metered").anyTimes(); + expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)); + expect(context.taskId()).andReturn(taskId); + expect(inner.name()).andReturn("metered").anyTimes(); } private void init() { - EasyMock.replay(inner, context); + replay(inner, context); metered.init(context, metered); } @@ -104,20 +111,20 @@ public void testMetrics() { @Test public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { - inner.put(EasyMock.eq(windowedKeyBytes), EasyMock.aryEq(keyBytes)); - EasyMock.expectLastCall(); + inner.put(eq(windowedKeyBytes), aryEq(keyBytes)); + expectLastCall(); init(); metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key); final KafkaMetric metric = metric("put-rate"); assertTrue(((Double) metric.metricValue()) > 0); - EasyMock.verify(inner); + verify(inner); } @Test public void shouldFindSessionsFromStoreAndRecordFetchMetric() { - EasyMock.expect(inner.findSessions(Bytes.wrap(keyBytes), 0, 0)) + expect(inner.findSessions(Bytes.wrap(keyBytes), 0, 0)) .andReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); init(); @@ -129,12 +136,12 @@ public void shouldFindSessionsFromStoreAndRecordFetchMetric() { final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - EasyMock.verify(inner); + verify(inner); } @Test public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { - EasyMock.expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, 0)) + expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, 0)) .andReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); init(); @@ -146,13 +153,13 @@ public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - EasyMock.verify(inner); + verify(inner); } @Test public void shouldRemoveFromStoreAndRecordRemoveMetric() { inner.remove(windowedKeyBytes); - EasyMock.expectLastCall(); + expectLastCall(); init(); @@ -160,12 +167,12 @@ public void shouldRemoveFromStoreAndRecordRemoveMetric() { final KafkaMetric metric = metric("remove-rate"); assertTrue((Double) metric.metricValue() > 0); - EasyMock.verify(inner); + verify(inner); } @Test public void shouldFetchForKeyAndRecordFetchMetric() { - EasyMock.expect(inner.findSessions(Bytes.wrap(keyBytes), 0, Long.MAX_VALUE)) + expect(inner.findSessions(Bytes.wrap(keyBytes), 0, Long.MAX_VALUE)) .andReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); init(); @@ -177,12 +184,12 @@ public void shouldFetchForKeyAndRecordFetchMetric() { final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - EasyMock.verify(inner); + verify(inner); } @Test public void shouldFetchRangeFromStoreAndRecordFetchMetric() { - EasyMock.expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, Long.MAX_VALUE)) + expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, Long.MAX_VALUE)) .andReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); init(); @@ -194,7 +201,7 @@ public void shouldFetchRangeFromStoreAndRecordFetchMetric() { final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - EasyMock.verify(inner); + verify(inner); } @Test @@ -244,6 +251,32 @@ public void shouldThrowNullPointerOnFindSessionsRangeIfToIsNull() { metered.findSessions("a", null, 0, 0); } + private interface CachedSessionStore extends SessionStore, CachedStateStore { } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetFlushListenerOnWrappedCachingStore() { + final CachedSessionStore cachedSessionStore = mock(CachedSessionStore.class); + + expect(cachedSessionStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true); + replay(cachedSessionStore); + + metered = new MeteredSessionStore<>( + cachedSessionStore, + "scope", + Serdes.String(), + Serdes.String(), + new MockTime()); + assertTrue(metered.setFlushListener(null, false)); + + verify(cachedSessionStore); + } + + @Test + public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { + assertFalse(metered.setFlushListener(null, false)); + } + private KafkaMetric metric(final String name) { return this.metrics.metric(new MetricName(name, "stream-scope-metrics", "", this.tags)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 97a835ea6eccb..b4ddb9fddf11a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -28,14 +28,12 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -44,16 +42,26 @@ import static java.time.Instant.ofEpochMilli; import static java.util.Collections.singletonMap; import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class MeteredWindowStoreTest { private InternalMockProcessorContext context; @SuppressWarnings("unchecked") - private final WindowStore innerStoreMock = EasyMock.createNiceMock(WindowStore.class); + private final WindowStore innerStoreMock = createNiceMock(WindowStore.class); private final MeteredWindowStore store = new MeteredWindowStore<>( innerStoreMock, + 10L, // any size "scope", new MockTime(), Serdes.String(), @@ -62,7 +70,7 @@ public class MeteredWindowStoreTest { private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG)); { - EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes(); + expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes(); } @Before @@ -75,19 +83,14 @@ public void setUp() { Serdes.Long(), streamsMetrics, new StreamsConfig(StreamsTestUtils.getStreamsConfig()), - new RecordCollector.Supplier() { - @Override - public RecordCollector recordCollector() { - return new NoOpRecordCollector(); - } - }, + NoOpRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, streamsMetrics) ); } @Test public void testMetrics() { - EasyMock.replay(innerStoreMock); + replay(innerStoreMock); store.init(context, store); final JmxReporter reporter = new JmxReporter("kafka.streams"); metrics.addReporter(reporter); @@ -100,8 +103,8 @@ public void testMetrics() { @Test public void shouldRecordRestoreLatencyOnInit() { innerStoreMock.init(context, store); - EasyMock.expectLastCall(); - EasyMock.replay(innerStoreMock); + expectLastCall(); + replay(innerStoreMock); store.init(context, store); final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); @@ -111,79 +114,104 @@ public void shouldRecordRestoreLatencyOnInit() { @Test public void shouldRecordPutLatency() { final byte[] bytes = "a".getBytes(); - innerStoreMock.put(EasyMock.eq(Bytes.wrap(bytes)), EasyMock.anyObject(), EasyMock.eq(context.timestamp())); - EasyMock.expectLastCall(); - EasyMock.replay(innerStoreMock); + innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(), eq(context.timestamp())); + expectLastCall(); + replay(innerStoreMock); store.init(context, store); store.put("a", "a"); final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); - EasyMock.verify(innerStoreMock); + verify(innerStoreMock); } @Test public void shouldRecordFetchLatency() { - EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator()); - EasyMock.replay(innerStoreMock); + expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator()); + replay(innerStoreMock); store.init(context, store); store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); - EasyMock.verify(innerStoreMock); + verify(innerStoreMock); } @Test public void shouldRecordFetchRangeLatency() { - EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators., byte[]>emptyIterator()); - EasyMock.replay(innerStoreMock); + expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators., byte[]>emptyIterator()); + replay(innerStoreMock); store.init(context, store); store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); - EasyMock.verify(innerStoreMock); + verify(innerStoreMock); } - @Test public void shouldRecordFlushLatency() { innerStoreMock.flush(); - EasyMock.expectLastCall(); - EasyMock.replay(innerStoreMock); + expectLastCall(); + replay(innerStoreMock); store.init(context, store); store.flush(); final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); - EasyMock.verify(innerStoreMock); + verify(innerStoreMock); } - @Test public void shouldCloseUnderlyingStore() { innerStoreMock.close(); - EasyMock.expectLastCall(); - EasyMock.replay(innerStoreMock); + expectLastCall(); + replay(innerStoreMock); store.init(context, store); store.close(); - EasyMock.verify(innerStoreMock); + verify(innerStoreMock); } - @Test public void shouldNotExceptionIfFetchReturnsNull() { - EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); - EasyMock.replay(innerStoreMock); + expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); + replay(innerStoreMock); store.init(context, store); assertNull(store.fetch("a", 0)); } + private interface CachedWindowStore extends WindowStore, CachedStateStore { } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetFlushListenerOnWrappedCachingStore() { + final CachedWindowStore cachedWindowStore = mock(CachedWindowStore.class); + + expect(cachedWindowStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true); + replay(cachedWindowStore); + + final MeteredWindowStore metered = new MeteredWindowStore<>( + cachedWindowStore, + 10L, // any size + "scope", + new MockTime(), + Serdes.String(), + new SerdeThatDoesntHandleNull() + ); + assertTrue(metered.setFlushListener(null, false)); + + verify(cachedWindowStore); + } + + @Test + public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { + assertFalse(store.setFlushListener(null, false)); + } + } diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java index e9d20f1a4fbd4..a5e345efc5b45 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -16,30 +16,38 @@ */ package org.apache.kafka.test; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.CacheFlushListener; import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator; +import org.apache.kafka.streams.state.internals.WrappedStateStore; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; /** * This class is a generic version of the in-memory key-value store that is useful for testing when you * need a basic KeyValueStore for arbitrary types and don't have/want to write a serde */ -public class GenericInMemoryKeyValueStore implements KeyValueStore { +public class GenericInMemoryKeyValueStore + extends WrappedStateStore + implements KeyValueStore { private final String name; private final NavigableMap map; private volatile boolean open = false; public GenericInMemoryKeyValueStore(final String name) { + // it's not really a `WrappedStateStore` so we pass `null` + // however, we need to implement `WrappedStateStore` to make the store usable + super(null); this.name = name; this.map = new TreeMap<>(); @@ -52,7 +60,8 @@ public String name() { @Override @SuppressWarnings("unchecked") - /* This is a "dummy" store used for testing and does not support restoring from changelog since we allow it to be serde-ignorant */ + /* This is a "dummy" store used for testing; + it does not support restoring from changelog since we allow it to be serde-ignorant */ public void init(final ProcessorContext context, final StateStore root) { if (root != null) { context.register(root, null); @@ -61,6 +70,12 @@ public void init(final ProcessorContext context, final StateStore root) { this.open = true; } + @Override + public boolean setFlushListener(final CacheFlushListener listener, + final boolean sendOldValues) { + return false; + } + @Override public boolean persistent() { return false;