From 8c061b74ca69cbd9b8103743b3aac72c1acd371f Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Sun, 15 Dec 2024 02:04:34 -0800 Subject: [PATCH 1/3] convert to regular node --- .../CogroupedStreamAggregateBuilder.java | 123 +++++++----------- .../GroupedStreamAggregateBuilder.java | 9 +- .../kstream/internals/KGroupedTableImpl.java | 7 +- .../kstream/internals/KStreamImpl.java | 7 +- .../streams/kstream/internals/KTableImpl.java | 21 ++- .../internals/graph/GraphGraceSearchUtil.java | 4 +- .../graph/ProcessorToStateConnectorNode.java | 74 +++++++++++ .../graph/StatefulProcessorNode.java | 92 ------------- .../internals/graph/TableSuppressNode.java | 10 +- .../graph/GraphGraceSearchUtilTest.java | 43 +++--- 10 files changed, 167 insertions(+), 223 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 126df7de17b8d..1ce1e7451a71d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -30,8 +30,6 @@ import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreFactory; import java.util.ArrayList; @@ -61,24 +59,25 @@ KTable build(final Map, Aggregator processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); - boolean stateCreated = false; + int counter = 0; for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier parentProcessor = new KStreamAggregate<>(storeFactory, initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); - final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - statefulProcessorNode.setOutputVersioned(isOutputVersioned); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + aggProcessorNode.setOutputVersioned(isOutputVersioned); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -96,7 +95,6 @@ KTable build(final Map final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); - boolean stateCreated = false; int counter = 0; for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier parentProcessor = @@ -107,17 +105,18 @@ KTable build(final Map initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); - final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -135,7 +134,6 @@ KTable build(final Map, Aggregator processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); - boolean stateCreated = false; int counter = 0; for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier parentProcessor = @@ -147,17 +145,17 @@ KTable build(final Map, Aggregator statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -174,7 +172,6 @@ KTable build(final Map, Aggregator parentProcessors = new ArrayList<>(); final Collection processors = new ArrayList<>(); - boolean stateCreated = false; int counter = 0; for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier parentProcessor = @@ -186,17 +183,17 @@ KTable build(final Map, Aggregator statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -262,30 +259,6 @@ KTable createTable(final Collection processors, builder); } - private StatefulProcessorNode getStatefulProcessorNode(final String processorName, - final boolean stateCreated, - final StoreFactory storeFactory, - final ProcessorSupplier kStreamAggregate) { - final StatefulProcessorNode statefulProcessorNode; - if (!stateCreated) { - statefulProcessorNode = - new StatefulProcessorNode<>( - processorName, - new ProcessorParameters<>(kStreamAggregate, processorName), - storeFactory - ); - } else { - statefulProcessorNode = - new StatefulProcessorNode<>( - processorName, - new ProcessorParameters<>(kStreamAggregate, processorName), - new String[]{storeFactory.storeName()} - ); - } - - return statefulProcessorNode; - } - @SuppressWarnings("unchecked") private void createRepartitionSource(final String repartitionTopicNamePrefix, final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index c3360c9c0133b..bc84e69a1ff6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.processor.internals.StoreFactory; import java.util.Collections; @@ -97,11 +97,10 @@ KTable build(final NamedInternal functionName, parentNode = repartitionNode; } - final StatefulProcessorNode statefulProcessorNode = - new StatefulProcessorNode<>( + final ProcessorGraphNode statefulProcessorNode = + new ProcessorGraphNode<>( aggFunctionName, - new ProcessorParameters<>(aggregateSupplier, aggFunctionName), - new String[] {storeFactory.storeName()} + new ProcessorParameters<>(aggregateSupplier, aggFunctionName) ); statefulProcessorNode.setOutputVersioned(isOutputVersioned); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index e500582244bdc..fbce445e7ee0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -27,8 +27,8 @@ import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; @@ -88,10 +88,9 @@ private KTable doAggregate(final ProcessorSupplier, // the passed in StreamsGraphNode must be the parent of the repartition node builder.addGraphNode(this.graphNode, repartitionGraphNode); - final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( + final ProcessorGraphNode statefulProcessorNode = new ProcessorGraphNode<>( funcName, - new ProcessorParameters<>(aggregateSupplier, funcName), - new String[]{materialized.storeName()} + new ProcessorParameters<>(aggregateSupplier, funcName) ); statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0ba9086e45057..7deb9468c3fbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -48,7 +48,7 @@ import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode; import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode; @@ -1225,7 +1225,8 @@ public KStream process( } final String name = new NamedInternal(named).name(); - final StatefulProcessorNode processNode = new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode processNode = new + ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), stateStoreNames); @@ -1270,7 +1271,7 @@ public KStream processValues( } final String name = new NamedInternal(named).name(); - final StatefulProcessorNode processNode = new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode processNode = new ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), stateStoreNames); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 5367b8dede217..eacb9045efbe8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -54,7 +54,7 @@ import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode; import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode; @@ -574,9 +574,9 @@ public KTable suppress(final Suppressed suppressed) { final ProcessorGraphNode> node = new TableSuppressNode<>( name, - new ProcessorParameters<>(suppressionSupplier, name), - new String[]{storeName} + new ProcessorParameters<>(suppressionSupplier, name) ); + node.setOutputVersioned(false); builder.addGraphNode(graphNode, node); @@ -1235,26 +1235,24 @@ private KTable doJoinOnForeignKey(final KTable forei final String subscriptionReceiveName = renamed.suffixWithOrElseGet( "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR); - final StatefulProcessorNode> subscriptionReceiveNode = - new StatefulProcessorNode<>( + final ProcessorGraphNode> subscriptionReceiveNode = + new ProcessorGraphNode<>( subscriptionReceiveName, new ProcessorParameters<>( new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema), - subscriptionReceiveName), - new String[]{subscriptionStoreName} + subscriptionReceiveName) ); builder.addGraphNode(subscriptionSource, subscriptionReceiveNode); final KTableValueGetterSupplier foreignKeyValueGetter = ((KTableImpl) foreignKeyTable).valueGetterSupplier(); - final StatefulProcessorNode, Change>>> subscriptionJoinNode = - new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode, Change>>> subscriptionJoinNode = + new ProcessorToStateConnectorNode<>( new ProcessorParameters<>( new SubscriptionJoinProcessorSupplier<>( foreignKeyValueGetter ), renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR) ), - Collections.emptySet(), Collections.singleton(foreignKeyValueGetter) ); builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode); @@ -1306,7 +1304,7 @@ private KTable doJoinOnForeignKey(final KTable forei builder.internalTopologyBuilder.copartitionSources(resultSourceNodes); final KTableValueGetterSupplier primaryKeyValueGetter = valueGetterSupplier(); - final StatefulProcessorNode> responseJoinNode = new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode> responseJoinNode = new ProcessorToStateConnectorNode<>( new ProcessorParameters<>( new ResponseJoinProcessorSupplier<>( primaryKeyValueGetter, @@ -1317,7 +1315,6 @@ private KTable doJoinOnForeignKey(final KTable forei ), renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR) ), - Collections.emptySet(), Collections.singleton(primaryKeyValueGetter) ); builder.addGraphNode(foreignResponseSource, responseJoinNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java index 66ffdc003ae30..d58b3a3b1a955 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -72,8 +72,8 @@ private static long findAndVerifyWindowGrace(final GraphNode graphNode, final St @SuppressWarnings("rawtypes") private static Long extractGracePeriod(final GraphNode node) { - if (node instanceof StatefulProcessorNode) { - final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier(); + if (node instanceof ProcessorGraphNode) { + final ProcessorSupplier processorSupplier = ((ProcessorGraphNode) node).processorParameters().processorSupplier(); if (processorSupplier instanceof KStreamWindowAggregate) { final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier; final Windows windows = kStreamWindowAggregate.windows(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java new file mode 100644 index 0000000000000..b476d6a7731b2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; +import org.apache.kafka.streams.processor.ConnectedStoreProvider; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; + +import java.util.Arrays; +import java.util.Set; + +/** + * Used for stateful processors that need to be manually connected to the state store(s) + * they need to access. This should only be used in cases where the stores) cannot + * be connected automatically by implementing the {@link ConnectedStoreProvider#stores()} method + * and returning the store directly. Generally this will only apply to DSL operators that utilize + * value getters to access another processor's state store(s), and the process/processValues + * operator where the user's custom processor supplier doesn't implement the #stores method + * and has to be connected to the store when compiling the topology. + */ +public class ProcessorToStateConnectorNode extends ProcessorGraphNode { + + private final String[] storeNames; + + /** + * Create a node representing a stateful processor that uses value getters to access stores, and needs to + * be connected with those stores + */ + public ProcessorToStateConnectorNode(final ProcessorParameters processorParameters, + final Set> valueGetterSuppliers) { + super(processorParameters.processorName(), processorParameters); + storeNames = valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames())).toArray(String[]::new); + } + + /** + * Create a node representing a stateful processor, which needs to be connected to the provided stores + */ + public ProcessorToStateConnectorNode(final String nodeName, + final ProcessorParameters processorParameters, + final String[] storeNames) { + super(nodeName, processorParameters); + this.storeNames = storeNames; + } + + @Override + public String toString() { + return "ProcessorNode{" + + "storeNames=" + Arrays.toString(storeNames) + + "} " + super.toString(); + } + + @Override + public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { + processorParameters().addProcessorTo(topologyBuilder, parentNodeNames()); + + if (storeNames != null && storeNames.length > 0) { + topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(), storeNames); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java deleted file mode 100644 index ec6c6583b3efe..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals.graph; - -import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.StoreFactory; -import org.apache.kafka.streams.state.StoreBuilder; - -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Stream; - -public class StatefulProcessorNode extends ProcessorGraphNode { - - private final String[] storeNames; - private final StoreFactory storeFactory; - - /** - * Create a node representing a stateful processor, where the named stores have already been registered. - */ - public StatefulProcessorNode(final ProcessorParameters processorParameters, - final Set> preRegisteredStores, - final Set> valueGetterSuppliers) { - super(processorParameters.processorName(), processorParameters); - final Stream registeredStoreNames = preRegisteredStores.stream().map(StoreBuilder::name); - final Stream valueGetterStoreNames = valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames())); - storeNames = Stream.concat(registeredStoreNames, valueGetterStoreNames).toArray(String[]::new); - storeFactory = null; - } - - /** - * Create a node representing a stateful processor, where the named stores have already been registered. - */ - public StatefulProcessorNode(final String nodeName, - final ProcessorParameters processorParameters, - final String[] storeNames) { - super(nodeName, processorParameters); - - this.storeNames = storeNames; - this.storeFactory = null; - } - - - /** - * Create a node representing a stateful processor, - * where the store needs to be built and registered as part of building this node. - */ - public StatefulProcessorNode(final String nodeName, - final ProcessorParameters processorParameters, - final StoreFactory materializedKTableStoreBuilder) { - super(nodeName, processorParameters); - - this.storeNames = null; - this.storeFactory = materializedKTableStoreBuilder; - } - - @Override - public String toString() { - return "StatefulProcessorNode{" + - "storeNames=" + Arrays.toString(storeNames) + - ", storeBuilder=" + storeFactory + - "} " + super.toString(); - } - - @Override - public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - processorParameters().addProcessorTo(topologyBuilder, parentNodeNames()); - - if (storeNames != null && storeNames.length > 0) { - topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(), storeNames); - } - - if (storeFactory != null) { - topologyBuilder.addStateStore(storeFactory, processorParameters().processorName()); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java index 595d0266aaea5..ac4a3f25c37e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals.graph; -public class TableSuppressNode extends StatefulProcessorNode { +/** + * Marker interface to identify suppression nodes since they have some special requirements + */ +public class TableSuppressNode extends ProcessorGraphNode { public TableSuppressNode(final String nodeName, - final ProcessorParameters processorParameters, - final String[] storeNames) { - super(nodeName, processorParameters, storeNames); + final ProcessorParameters processorParameters) { + super(nodeName, processorParameters); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 8552790692c1d..182dec1ef14c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.StoreFactory; import org.junit.jupiter.api.Test; @@ -50,8 +49,8 @@ public void shouldThrowOnNull() { public void shouldFailIfThereIsNoGraceAncestor() { // doesn't matter if this ancestor is stateless or stateful. The important thing it that there is // no grace period defined on any ancestor of the node - final StatefulProcessorNode gracelessAncestor = new StatefulProcessorNode<>( - "stateful", + final ProcessorGraphNode gracelessAncestor = new ProcessorGraphNode<>( + "stateless", new ProcessorParameters<>( () -> new Processor() { @@ -60,8 +59,7 @@ public void process(final Record record) {} }, "dummy" - ), - (StoreFactory) null + ) ); final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); @@ -78,7 +76,7 @@ public void process(final Record record) {} @Test public void shouldExtractGraceFromKStreamWindowAggregateNode() { final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode node = new StatefulProcessorNode<>( + final ProcessorGraphNode node = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate( @@ -89,8 +87,7 @@ public void shouldExtractGraceFromKStreamWindowAggregateNode() { null ), "asdf" - ), - (StoreFactory) null + ) ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -101,7 +98,7 @@ public void shouldExtractGraceFromKStreamWindowAggregateNode() { public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode node = new StatefulProcessorNode<>( + final ProcessorGraphNode node = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate( @@ -113,8 +110,7 @@ public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { null ), "asdf" - ), - (StoreFactory) null + ) ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -124,15 +120,14 @@ public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { @Test public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode graceGrandparent = new StatefulProcessorNode<>( + final ProcessorGraphNode graceGrandparent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>(new KStreamSessionWindowAggregate( windows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, null - ), "asdf"), - (StoreFactory) null + ), "asdf") ); - final StatefulProcessorNode statefulParent = new StatefulProcessorNode<>( + final ProcessorGraphNode statefulParent = new ProcessorGraphNode<>( "stateful", new ProcessorParameters<>( () -> new Processor() { @@ -142,8 +137,7 @@ public void process(final Record record) {} }, "dummy" - ), - (StoreFactory) null + ) ); graceGrandparent.addChild(statefulParent); @@ -157,7 +151,7 @@ public void process(final Record record) {} @Test public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode graceGrandparent = new StatefulProcessorNode<>( + final ProcessorGraphNode graceGrandparent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate( @@ -169,8 +163,7 @@ public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { null ), "asdf" - ), - (StoreFactory) null + ) ); final ProcessorGraphNode statelessParent = new ProcessorGraphNode<>("stateless", null); @@ -185,7 +178,7 @@ public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { @Test public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { - final StatefulProcessorNode leftParent = new StatefulProcessorNode<>( + final ProcessorGraphNode leftParent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate( @@ -197,11 +190,10 @@ public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { null ), "asdf" - ), - (StoreFactory) null + ) ); - final StatefulProcessorNode rightParent = new StatefulProcessorNode<>( + final ProcessorGraphNode rightParent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate( @@ -212,8 +204,7 @@ public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { null ), "asdf" - ), - (StoreFactory) null + ) ); final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); From a3238e30900560bf12ae7551156bf5cbc5c23bc8 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Sat, 28 Dec 2024 15:41:53 -0800 Subject: [PATCH 2/3] fix test and constructor cleanup --- .../ForeignJoinSubscriptionSendNode.java | 2 +- .../internals/graph/ProcessorGraphNode.java | 7 -- .../graph/GraphGraceSearchUtilTest.java | 69 +++++++++++++++++-- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java index 4efbd9b29f1c1..afd9ee1e64d8d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java @@ -22,7 +22,7 @@ public class ForeignJoinSubscriptionSendNode extends ProcessorGraphNode implements VersionedSemanticsGraphNode { public ForeignJoinSubscriptionSendNode(final ProcessorParameters processorParameters) { - super(processorParameters); + super(processorParameters.processorName(), processorParameters); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java index 1c8e8cace2b30..514676af1f6d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java @@ -28,13 +28,6 @@ public class ProcessorGraphNode extends GraphNode { private final ProcessorParameters processorParameters; - public ProcessorGraphNode(final ProcessorParameters processorParameters) { - - super(processorParameters.processorName()); - - this.processorParameters = processorParameters; - } - public ProcessorGraphNode(final String nodeName, final ProcessorParameters processorParameters) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 182dec1ef14c9..5db1439835cb2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -50,6 +50,18 @@ public void shouldFailIfThereIsNoGraceAncestor() { // doesn't matter if this ancestor is stateless or stateful. The important thing it that there is // no grace period defined on any ancestor of the node final ProcessorGraphNode gracelessAncestor = new ProcessorGraphNode<>( + "graceless", + new ProcessorParameters<>( + () -> new Processor() { + @Override + public void process(final Record record) {} + + }, + "graceless" + ) + ); + + final ProcessorGraphNode node = new ProcessorGraphNode<>( "stateless", new ProcessorParameters<>( () -> new Processor() { @@ -58,18 +70,17 @@ public void shouldFailIfThereIsNoGraceAncestor() { public void process(final Record record) {} }, - "dummy" + "stateless" ) ); - final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); gracelessAncestor.addChild(node); try { GraphGraceSearchUtil.findAndVerifyWindowGrace(node); fail("should have thrown."); } catch (final TopologyException e) { - assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless].")); + assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [graceless->stateless].")); } } @@ -141,7 +152,18 @@ public void process(final Record record) {} ); graceGrandparent.addChild(statefulParent); - final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode node = new ProcessorGraphNode<>( + "stateless", + new ProcessorParameters<>( + () -> new Processor() { + + @Override + public void process(final Record record) {} + + }, + "dummyChild-graceless" + ) + ); statefulParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -166,10 +188,32 @@ public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { ) ); - final ProcessorGraphNode statelessParent = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode statelessParent = new ProcessorGraphNode<>( + "statelessParent", + new ProcessorParameters<>( + () -> new Processor() { + + @Override + public void process(final Record record) {} + + }, + "statelessParent" + ) + ); graceGrandparent.addChild(statelessParent); - final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode node = new ProcessorGraphNode<>( + "stateless", + new ProcessorParameters<>( + () -> new Processor() { + + @Override + public void process(final Record record) {} + + }, + "stateless" + ) + ); statelessParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -207,7 +251,18 @@ public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { ) ); - final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode node = new ProcessorGraphNode<>( + "stateless", + new ProcessorParameters<>( + () -> new Processor() { + + @Override + public void process(final Record record) {} + + }, + "stateless" + ) + ); leftParent.addChild(node); rightParent.addChild(node); From ad7524fe946efa5c415a50dfa0e3c0e127c60579 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Sat, 28 Dec 2024 17:25:55 -0800 Subject: [PATCH 3/3] clean up TableProcessorNode too --- .../streams/kstream/internals/KTableImpl.java | 28 ++++--- .../internals/graph/TableFilterNode.java | 4 +- .../internals/graph/TableProcessorNode.java | 76 ------------------- .../graph/TableProcessorNodeTest.java | 55 -------------- 4 files changed, 19 insertions(+), 144 deletions(-) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java delete mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index eacb9045efbe8..06a6043ffd386 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -58,7 +58,6 @@ import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode; import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode; -import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode; import org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode; import org.apache.kafka.streams.kstream.internals.graph.TableSuppressNode; import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; @@ -69,7 +68,9 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; +import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -312,7 +313,7 @@ private KTable doMapValues(final ValueMapperWithKey processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( new ProcessorParameters<>(processorSupplier, name) ); - final GraphNode tableNode = new TableProcessorNode<>( + final GraphNode tableNode = new ProcessorGraphNode<>( name, processorParameters ); @@ -439,7 +440,7 @@ private KTable doTransformValues(final ValueTransformerWithKeySuppli final Serde keySerde; final Serde valueSerde; final String queryableStoreName; - final StoreFactory storeFactory; + final Set> storeBuilder; if (materializedInternal != null) { // don't inherit parent value serde, since this operation may change the value type, more specifically: @@ -449,12 +450,13 @@ private KTable doTransformValues(final ValueTransformerWithKeySuppli valueSerde = materializedInternal.valueSerde(); queryableStoreName = materializedInternal.queryableStoreName(); // only materialize if materialized is specified and it has queryable name - storeFactory = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)) : null; + final StoreFactory storeFactory = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)) : null; + storeBuilder = Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); } else { keySerde = this.keySerde; valueSerde = null; queryableStoreName = null; - storeFactory = null; + storeBuilder = null; } final String name = namedInternal.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME); @@ -464,14 +466,18 @@ private KTable doTransformValues(final ValueTransformerWithKeySuppli transformerSupplier, queryableStoreName); - final ProcessorParameters processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( - new ProcessorParameters<>(processorSupplier, name) - ); + final ProcessorParameters processorParameters = + unsafeCastProcessorParametersToCompletelyDifferentType( + new ProcessorParameters<>( + new StoreDelegatingProcessorSupplier<>( + processorSupplier, + storeBuilder), + name + )); - final GraphNode tableNode = new TableProcessorNode<>( + final GraphNode tableNode = new ProcessorToStateConnectorNode<>( name, processorParameters, - storeFactory, stateStoreNames ); maybeSetOutputVersioned(tableNode, materializedInternal); @@ -1336,7 +1342,7 @@ private KTable doJoinOnForeignKey(final KTable forei final KTableSource resultProcessorSupplier = new KTableSource<>(materializedInternal); - final TableProcessorNode resultNode = new TableProcessorNode<>( + final ProcessorGraphNode resultNode = new ProcessorGraphNode<>( resultProcessorName, new ProcessorParameters<>( resultProcessorSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java index 6fef05604cf68..a921dab0d1a06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java @@ -20,11 +20,11 @@ import org.apache.kafka.streams.kstream.internals.KTableFilter; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -public class TableFilterNode extends TableProcessorNode implements VersionedSemanticsGraphNode { +public class TableFilterNode extends ProcessorGraphNode implements VersionedSemanticsGraphNode { public TableFilterNode(final String nodeName, final ProcessorParameters processorParameters) { - super(nodeName, processorParameters, null, null); + super(nodeName, processorParameters); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java deleted file mode 100644 index af3ab15d4903c..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals.graph; - -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.StoreFactory; - -import java.util.Arrays; - -public class TableProcessorNode extends GraphNode { - - private final ProcessorParameters processorParameters; - private final StoreFactory storeFactory; - private final String[] storeNames; - - public TableProcessorNode(final String nodeName, - final ProcessorParameters processorParameters) { - this(nodeName, processorParameters, null, null); - } - - public TableProcessorNode(final String nodeName, - final ProcessorParameters processorParameters, - final StoreFactory storeFactory, - final String[] storeNames) { - super(nodeName); - this.processorParameters = processorParameters; - this.storeFactory = storeFactory; - this.storeNames = storeNames != null ? storeNames : new String[] {}; - } - - public ProcessorParameters processorParameters() { - return processorParameters; - } - - @Override - public String toString() { - return "TableProcessorNode{" + - ", processorParameters=" + processorParameters + - ", storeFactory=" + (storeFactory == null ? "null" : storeFactory.storeName()) + - ", storeNames=" + Arrays.toString(storeNames) + - "} " + super.toString(); - } - - @SuppressWarnings("unchecked") - @Override - public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - processorParameters.addProcessorTo(topologyBuilder, parentNodeNames()); - - final String processorName = processorParameters.processorName(); - - if (storeNames.length > 0) { - // todo(rodesai): remove me once all operators have been moved to ProcessorSupplier - topologyBuilder.connectProcessorAndStateStores(processorName, storeNames); - } - - if (storeFactory != null) { - // todo(rodesai) remove when KTableImpl#doFilter, KTableImpl#doTransformValues moved to ProcessorSupplier - topologyBuilder.addStateStore(storeFactory, processorName); - } - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java deleted file mode 100644 index 36a32d01dc2fd..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals.graph; - -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.Record; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TableProcessorNodeTest { - private static class TestProcessor implements Processor { - - @Override - public void process(final Record record) { - } - - } - - @Test - public void shouldConvertToStringWithNullStoreBuilder() { - final TableProcessorNode node = new TableProcessorNode<>( - "name", - new ProcessorParameters<>(TestProcessor::new, "processor"), - null, - new String[]{"store1", "store2"} - ); - - final String asString = node.toString(); - final String expected = "storeFactory=null"; - assertTrue( - asString.contains(expected), - String.format( - "Expected toString to return string with \"%s\", received: %s", - expected, - asString) - ); - } -} \ No newline at end of file