From dd5bfd2492bb74feffcbbd9c5f6238af76112a40 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Fri, 13 Dec 2024 18:10:34 -0800 Subject: [PATCH 1/2] transition KTable#filter impl to use processor wrapper This patch transitions the KTable#filter implementation to provide the materialized store via the ProcessorSupplier so that it can be wrapped by the processor wrapper if the wrapper is configured --- .../kstream/internals/KTableFilter.java | 17 ++++++++++++- .../streams/kstream/internals/KTableImpl.java | 8 ++----- .../internals/graph/TableFilterNode.java | 6 ++--- .../kafka/streams/StreamsBuilderTest.java | 24 +++++++++++++++++-- 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 475ea85db940a..b4f44bdab5eb1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -16,10 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals; +import java.util.Collections; +import java.util.Set; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; @@ -34,17 +38,20 @@ public class KTableFilter implements KTableProcessorSupplier parent, final Predicate predicate, final boolean filterNot, - final String queryableName) { + final String queryableName, + final StoreFactory storeFactory) { this.parent = parent; this.predicate = predicate; this.filterNot = filterNot; this.queryableName = queryableName; // If upstream is already materialized, enable sending old values to avoid sending unnecessary tombstones: this.sendOldValues = parent.enableSendingOldValues(false); + this.storeFactory = storeFactory; } public void setUseVersionedSemantics(final boolean useVersionedSemantics) { @@ -61,6 +68,14 @@ public Processor, KIn, Change> get() { return new KTableFilterProcessor(); } + @Override + public Set> stores() { + if (storeFactory == null) { + return null; + } + return Collections.singleton(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public boolean enableSendingOldValues(final boolean forceMaterialization) { if (queryableName != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index aa07714167738..40c565c8ceb4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -209,17 +209,13 @@ private KTable doFilter(final Predicate predicate, final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME); final KTableProcessorSupplier processorSupplier = - new KTableFilter<>(this, predicate, filterNot, queryableStoreName); + new KTableFilter<>(this, predicate, filterNot, queryableStoreName, storeFactory); final ProcessorParameters processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( new ProcessorParameters<>(processorSupplier, name) ); - final GraphNode tableNode = new TableFilterNode<>( - name, - processorParameters, - storeFactory - ); + final GraphNode tableNode = new TableFilterNode<>(name, processorParameters); maybeSetOutputVersioned(tableNode, materializedInternal); builder.addGraphNode(this.graphNode, tableNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java index 38033693ebb62..6fef05604cf68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java @@ -19,14 +19,12 @@ import org.apache.kafka.streams.kstream.internals.KTableFilter; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.internals.StoreFactory; public class TableFilterNode extends TableProcessorNode implements VersionedSemanticsGraphNode { public TableFilterNode(final String nodeName, - final ProcessorParameters processorParameters, - final StoreFactory storeFactory) { - super(nodeName, processorParameters, storeFactory, null); + final ProcessorParameters processorParameters) { + super(nodeName, processorParameters, null, null); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index b20bec101bca5..71905e1481c8f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -1642,6 +1642,26 @@ public void shouldWrapProcessorsForMapValuesWithMaterializedStore() { assertThat(counter.numConnectedStateStores(), is(1)); } + @Test + public void shouldWrapProcessorAndStoreForFilterTable() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.table("input", Consumed.as("source-table")) + .filter((k, v) -> true, Named.as("filter"), Materialized.as("filter")) + .toStream(Named.as("to-stream")) + .to("output-topic", Produced.as("sink")); + builder.build(); + + assertThat(counter.wrappedProcessorNames(), + Matchers.containsInAnyOrder("source-table", "filter", "to-stream")); + assertThat(counter.numUniqueStateStores(), is(1)); + assertThat(counter.numConnectedStateStores(), is(1)); + } + @Test public void shouldWrapProcessorsForTableAggregate() { final Map props = dummyStreamsConfigMap(); @@ -1744,8 +1764,8 @@ public void shouldWrapProcessorsWhenMultipleTableOperators() { "to-table", "map-values", "map-values-stateful", "filter-table", "filter-table-stateful", "to-stream" )); - assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); - assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); } @Test From cb959d7381b98e36c5bd7de76440ec259d37058d Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Tue, 17 Dec 2024 00:24:29 -0800 Subject: [PATCH 2/2] fix spotless --- .../apache/kafka/streams/kstream/internals/KTableFilter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index b4f44bdab5eb1..91e2fac9411d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import java.util.Collections; -import java.util.Set; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -27,6 +25,9 @@ import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; +import java.util.Collections; +import java.util.Set; + import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;