From e61019d7edf5e4a14abfb4b586a421e97bac8ea7 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 11 Dec 2024 17:22:57 -0800 Subject: [PATCH 1/2] KAFKA-18026: KIP-1112 migrate KTableSuppressed --- .../streams/kstream/internals/KTableImpl.java | 15 ++++++----- .../internals/graph/TableSuppressNode.java | 6 ++--- .../KTableSuppressProcessorSupplier.java | 20 ++++++++++----- .../kafka/streams/StreamsBuilderTest.java | 25 +++++++++++++++++++ .../KTableSuppressProcessorMetricsTest.java | 10 +++++++- .../suppress/KTableSuppressProcessorTest.java | 10 +++++++- 6 files changed, 66 insertions(+), 20 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index aa07714167738..9ff1a134a9d2b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -69,7 +69,6 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; -import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -565,12 +564,6 @@ public KTable suppress(final Suppressed suppressed) { final String storeName = suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); - final ProcessorSupplier, K, Change> suppressionSupplier = new KTableSuppressProcessorSupplier<>( - suppressedInternal, - storeName, - this - ); - final StoreBuilder>> storeBuilder; if (suppressedInternal.bufferConfig().isLoggingEnabled()) { @@ -588,10 +581,16 @@ public KTable suppress(final Suppressed suppressed) { .withLoggingDisabled(); } + final ProcessorSupplier, K, Change> suppressionSupplier = new KTableSuppressProcessorSupplier<>( + suppressedInternal, + storeBuilder, + this + ); + final ProcessorGraphNode> node = new TableSuppressNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - StoreBuilderWrapper.wrapStoreBuilder(storeBuilder) + new String[]{storeName} ); node.setOutputVersioned(false); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java index 88e55f37a25ef..595d0266aaea5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.streams.processor.internals.StoreFactory; - public class TableSuppressNode extends StatefulProcessorNode { public TableSuppressNode(final String nodeName, final ProcessorParameters processorParameters, - final StoreFactory materializedKTableStoreBuilder) { - super(nodeName, processorParameters, materializedKTableStoreBuilder); + final String[] storeNames) { + super(nodeName, processorParameters, storeNames); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java index 0b0c6ca15e9f7..637c0a757e35c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java @@ -33,23 +33,26 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.Maybe; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; +import java.util.Set; + import static java.util.Objects.requireNonNull; public class KTableSuppressProcessorSupplier implements KTableProcessorSupplier { private final SuppressedInternal suppress; - private final String storeName; + private final StoreBuilder storeBuilder; private final KTableImpl parentKTable; public KTableSuppressProcessorSupplier(final SuppressedInternal suppress, - final String storeName, + final StoreBuilder storeBuilder, final KTableImpl parentKTable) { this.suppress = suppress; - this.storeName = storeName; + this.storeBuilder = storeBuilder; this.parentKTable = parentKTable; // The suppress buffer requires seeing the old values, to support the prior value view. parentKTable.enableSendingOldValues(true); @@ -57,7 +60,12 @@ public KTableSuppressProcessorSupplier(final SuppressedInternal suppress, @Override public Processor, K, Change> get() { - return new KTableSuppressProcessor<>(suppress, storeName); + return new KTableSuppressProcessor<>(suppress, storeBuilder.name()); + } + + @Override + public Set> stores() { + return Set.of(storeBuilder); } @Override @@ -75,7 +83,7 @@ public KTableValueGetter get() { public void init(final ProcessorContext context) { parentGetter.init(context); // the main processor is responsible for the buffer's lifecycle - buffer = requireNonNull(context.getStateStore(storeName)); + buffer = requireNonNull(context.getStateStore(storeBuilder.name())); } @Override @@ -107,7 +115,7 @@ public String[] storeNames() { final String[] parentStores = parentValueGetterSupplier.storeNames(); final String[] stores = new String[1 + parentStores.length]; System.arraycopy(parentStores, 0, stores, 1, parentStores.length); - stores[0] = storeName; + stores[0] = storeBuilder.name(); return stores; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index b20bec101bca5..64fdf6b3bb2f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.kstream.TableJoined; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.Processor; @@ -1517,6 +1518,30 @@ public void shouldWrapProcessorsForStreamAggregate() { assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); } + @Test + public void shouldWrapProcessorsForSuppress() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) + .groupByKey() + .count(Named.as("count"))// wrapped 1 + .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(10), Suppressed.BufferConfig.unbounded()).withName("suppressed")) // wrapped 2 + .toStream(Named.as("toStream"))// wrapped 3 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream", "suppressed")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); + } + @Test public void shouldWrapProcessorsForTimeWindowStreamAggregate() { final Map props = dummyStreamsConfigMap(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java index bfa803f89bb9b..d051d6a6acf68 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.test.MockInternalNewProcessorContext; import org.apache.kafka.test.StreamsTestUtils; @@ -38,6 +39,7 @@ import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -144,7 +146,7 @@ public void shouldRecordMetricsWithBuiltInMetricsVersionLatest() { final Processor, String, Change> processor = new KTableSuppressProcessorSupplier<>( (SuppressedInternal) Suppressed.untilTimeLimit(Duration.ofDays(100), maxRecords(1)), - storeName, + mockBuilderWithName(storeName), mock ).get(); @@ -206,4 +208,10 @@ private static void verifyMetric(final Map met assertThat(metrics.get(metricName).metricName().description(), is(metricName.description())); assertThat((T) metrics.get(metricName).metricValue(), matcher); } + + private StoreBuilder mockBuilderWithName(final String name) { + final StoreBuilder builder = Mockito.mock(StoreBuilder.class); + Mockito.when(builder.name()).thenReturn(name); + return builder; + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 36d09f5bafe95..e34e58c78c2d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.test.MockInternalNewProcessorContext; @@ -43,6 +44,7 @@ import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -92,7 +94,7 @@ private static class Harness { @SuppressWarnings("unchecked") final KTableImpl parent = mock(KTableImpl.class); final Processor, K, Change> processor = - new KTableSuppressProcessorSupplier<>((SuppressedInternal) suppressed, storeName, parent).get(); + new KTableSuppressProcessorSupplier<>((SuppressedInternal) suppressed, mockBuilderWithName(storeName), parent).get(); final MockInternalNewProcessorContext> context = new MockInternalNewProcessorContext<>(); context.setCurrentNode(new ProcessorNode("testNode")); @@ -487,4 +489,10 @@ private static Serde> timeWindowedSerdeFrom(final Class rawTy new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize) ); } + + private static StoreBuilder mockBuilderWithName(final String name) { + final StoreBuilder builder = Mockito.mock(StoreBuilder.class); + Mockito.when(builder.name()).thenReturn(name); + return builder; + } } \ No newline at end of file From 32800aeb078a600b1afb55cc07919c2118f2414c Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 17 Dec 2024 11:10:06 -0800 Subject: [PATCH 2/2] spotless --- .../test/java/org/apache/kafka/streams/StreamsBuilderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 64fdf6b3bb2f4..a44c2887f8c55 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -38,8 +38,8 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.StreamJoined; -import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.Processor;