From e172ac9f9a36ff293e9c9e491e433cffc1bbbfdb Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Fri, 3 Apr 2020 15:14:23 -0400 Subject: [PATCH 1/2] KAFKA-9739: Fixes null key changing child node (#8416) 2.4 port of #8400 since cherry-picking not possible Reviewers: John Roesler --- .../internals/InternalStreamsBuilder.java | 17 +- .../internals/graph/StreamsGraphTest.java | 186 ++++++++++++++++++ 2 files changed, 198 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index e48a3fc538050..ab9d230b66305 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -382,11 +382,13 @@ private void maybeUpdateKeyChangingRepartitionNodeMap() { final Set mergeNodeKeyChangingParentsToRemove = new HashSet<>(); for (final StreamsGraphNode mergeNode : mergeNodes) { mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); - final Collection keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet(); - for (final StreamsGraphNode key : keys) { - final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key)); - if (maybeParentKey != null) { - mergeNodesToKeyChangers.get(mergeNode).add(key); + final Set>> entrySet = keyChangingOperationsToOptimizableRepartitionNodes.entrySet(); + for (final Map.Entry> entry : entrySet) { + if (mergeNodeHasRepartitionChildren(mergeNode, entry.getValue())) { + final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(entry.getKey())); + if (maybeParentKey != null) { + mergeNodesToKeyChangers.get(mergeNode).add(entry.getKey()); + } } } } @@ -407,6 +409,11 @@ private void maybeUpdateKeyChangingRepartitionNodeMap() { } } + private boolean mergeNodeHasRepartitionChildren(final StreamsGraphNode mergeNode, + final LinkedHashSet repartitionNodes) { + return repartitionNodes.stream().allMatch(n -> findParentNodeMatching(n, gn -> gn.parentNodes().contains(mergeNode)) != null); + } + @SuppressWarnings("unchecked") private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName, final Serde keySerde, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index e2006e607e8c8..6a0a0b69ea3f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -22,14 +22,22 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.ProcessorContext; import org.junit.Test; import java.time.Duration; @@ -47,6 +55,8 @@ public class StreamsGraphTest { private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + private Initializer initializer; + private Aggregator aggregator; // Test builds topology in succesive manner but only graph node not yet processed written to topology @@ -101,6 +111,76 @@ public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() { builder.build(properties); } + @Test + @SuppressWarnings("unchecked") + public void shouldNotThrowNPEWithMergeNodes() { + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + initializer = () -> ""; + aggregator = (aggKey, value, aggregate) -> aggregate + value.length(); + final TransformerSupplier> transformSupplier = () -> new Transformer>() { + @Override + public void init(final ProcessorContext context) { + + } + + @Override + public KeyValue transform(final String key, final String value) { + return KeyValue.pair(key, value); + } + + @Override + public void close() { + + } + }; + + final KStream retryStream = builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String())) + .transform(transformSupplier) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .aggregate(initializer, + aggregator, + Materialized.with(Serdes.String(), Serdes.String())) + .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500), Suppressed.BufferConfig.maxBytes(64_000_000))) + .toStream() + .flatMap((k, v) -> new ArrayList<>()); + + final KTable idTable = builder.stream("id-table-topic", Consumed.with(Serdes.String(), Serdes.String())) + .flatMap((k, v) -> new ArrayList>()) + .peek((subscriptionId, recipientId) -> System.out.println("data " + subscriptionId + " " + recipientId)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .aggregate(initializer, + aggregator, + Materialized.with(Serdes.String(), Serdes.String())); + + final KStream joinStream = builder.stream("internal-topic-command", Consumed.with(Serdes.String(), Serdes.String())) + .peek((subscriptionId, command) -> System.out.println("stdoutput")) + .mapValues((k, v) -> v) + .merge(retryStream) + .leftJoin(idTable, (v1, v2) -> v1 + v2, + Joined.with(Serdes.String(), Serdes.String(), Serdes.String())); + + final KStream[] branches = joinStream.branch((k, v) -> v.equals("some-value"), (k, v) -> true); + + branches[0].map(KeyValue::pair) + .peek((recipientId, command) -> System.out.println("printing out")) + .to("external-command", Produced.with(Serdes.String(), Serdes.String())); + + branches[1].filter((k, v) -> v != null) + .peek((subscriptionId, wrapper) -> System.out.println("Printing output")) + .mapValues((k, v) -> v) + .to("dlq-topic", Produced.with(Serdes.String(), Serdes.String())); + + branches[1].map(KeyValue::pair).to("retryTopic", Produced.with(Serdes.String(), Serdes.String())); + + final Topology topology = builder.build(properties); + assertEquals(expectedComplexMergeOptimizeTopology, topology.describe().toString()); + } + @Test public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() { @@ -291,4 +371,110 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) { " Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" + " <-- KSTREAM-MERGE-0000000006\n\n"; + + private final String expectedComplexMergeOptimizeTopology = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" + + " --> KSTREAM-TRANSFORM-0000000001\n" + + " Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000040\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" + + " --> KSTREAM-SINK-0000000039\n" + + " <-- KSTREAM-TRANSFORM-0000000001\n" + + " Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" + + " <-- KSTREAM-FILTER-0000000040\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + + " --> KTABLE-SUPPRESS-0000000007\n" + + " <-- KSTREAM-SOURCE-0000000041\n" + + " Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" + + " --> KSTREAM-PEEK-0000000020\n" + + " Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" + + " --> KTABLE-TOSTREAM-0000000009\n" + + " <-- KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-PEEK-0000000020 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000021\n" + + " <-- KSTREAM-SOURCE-0000000019\n" + + " Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n" + + " --> KSTREAM-FLATMAP-0000000010\n" + + " <-- KTABLE-SUPPRESS-0000000007\n" + + " Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022\n" + + " <-- KTABLE-TOSTREAM-0000000009\n" + + " Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022\n" + + " <-- KSTREAM-PEEK-0000000020\n" + + " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000024\n" + + " <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" + + " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" + + " --> KSTREAM-SINK-0000000023\n" + + " <-- KSTREAM-MERGE-0000000022\n" + + " Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" + + " <-- KSTREAM-FILTER-0000000024\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" + + " --> KSTREAM-FLATMAP-0000000012\n" + + " Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000043\n" + + " <-- KSTREAM-SOURCE-0000000011\n" + + " Processor: KSTREAM-FILTER-0000000043 (stores: [])\n" + + " --> KSTREAM-SINK-0000000042\n" + + " <-- KSTREAM-FLATMAP-0000000012\n" + + " Sink: KSTREAM-SINK-0000000042 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n" + + " <-- KSTREAM-FILTER-0000000043\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + + " --> KSTREAM-LEFTJOIN-0000000026\n" + + " Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + + " --> KSTREAM-BRANCH-0000000027\n" + + " <-- KSTREAM-SOURCE-0000000025\n" + + " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" + + " --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n" + + " <-- KSTREAM-LEFTJOIN-0000000026\n" + + " Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n" + + " <-- KSTREAM-BRANCH-0000000027\n" + + " Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n" + + " --> KSTREAM-MAP-0000000030\n" + + " <-- KSTREAM-BRANCH-0000000027\n" + + " Processor: KSTREAM-FILTER-0000000033 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000034\n" + + " <-- KSTREAM-BRANCHCHILD-0000000029\n" + + " Processor: KSTREAM-MAP-0000000030 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000031\n" + + " <-- KSTREAM-BRANCHCHILD-0000000028\n" + + " Processor: KSTREAM-PEEK-0000000034 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000035\n" + + " <-- KSTREAM-FILTER-0000000033\n" + + " Source: KSTREAM-SOURCE-0000000044 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n" + + " --> KSTREAM-PEEK-0000000013\n" + + " Processor: KSTREAM-MAP-0000000037 (stores: [])\n" + + " --> KSTREAM-SINK-0000000038\n" + + " <-- KSTREAM-BRANCHCHILD-0000000029\n" + + " Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n" + + " --> KSTREAM-SINK-0000000036\n" + + " <-- KSTREAM-PEEK-0000000034\n" + + " Processor: KSTREAM-PEEK-0000000013 (stores: [])\n" + + " --> KSTREAM-AGGREGATE-0000000015\n" + + " <-- KSTREAM-SOURCE-0000000044\n" + + " Processor: KSTREAM-PEEK-0000000031 (stores: [])\n" + + " --> KSTREAM-SINK-0000000032\n" + + " <-- KSTREAM-MAP-0000000030\n" + + " Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + + " --> none\n" + + " <-- KSTREAM-PEEK-0000000013\n" + + " Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n" + + " <-- KSTREAM-PEEK-0000000031\n" + + " Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n" + + " <-- KSTREAM-MAPVALUES-0000000035\n" + + " Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n" + + " <-- KSTREAM-MAP-0000000037\n\n"; + } From ee1a1b8a22b7e2096057446ec95de69c8af04a2a Mon Sep 17 00:00:00 2001 From: bill Date: Fri, 3 Apr 2020 15:51:25 -0400 Subject: [PATCH 2/2] KAFKA-9739: This is a port of trunk/2.4 pr. In 2.4 we changed how we sort source nodes so to merge this PR with 2.3 I had to update the expected topology to what was expected on 2.3 --- .../internals/graph/StreamsGraphTest.java | 208 +++++++++--------- 1 file changed, 104 insertions(+), 104 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 6a0a0b69ea3f4..84578eae1f864 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -372,109 +372,109 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) { " <-- KSTREAM-MERGE-0000000006\n\n"; - private final String expectedComplexMergeOptimizeTopology = "Topologies:\n" + - " Sub-topology: 0\n" + - " Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" + - " --> KSTREAM-TRANSFORM-0000000001\n" + - " Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" + - " --> KSTREAM-FILTER-0000000040\n" + - " <-- KSTREAM-SOURCE-0000000000\n" + - " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" + - " --> KSTREAM-SINK-0000000039\n" + - " <-- KSTREAM-TRANSFORM-0000000001\n" + - " Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" + - " <-- KSTREAM-FILTER-0000000040\n" + - "\n" + - " Sub-topology: 1\n" + - " Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" + - " --> KSTREAM-AGGREGATE-0000000003\n" + - " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + - " --> KTABLE-SUPPRESS-0000000007\n" + - " <-- KSTREAM-SOURCE-0000000041\n" + - " Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" + - " --> KSTREAM-PEEK-0000000020\n" + - " Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" + - " --> KTABLE-TOSTREAM-0000000009\n" + - " <-- KSTREAM-AGGREGATE-0000000003\n" + - " Processor: KSTREAM-PEEK-0000000020 (stores: [])\n" + - " --> KSTREAM-MAPVALUES-0000000021\n" + - " <-- KSTREAM-SOURCE-0000000019\n" + - " Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n" + - " --> KSTREAM-FLATMAP-0000000010\n" + - " <-- KTABLE-SUPPRESS-0000000007\n" + - " Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n" + - " --> KSTREAM-MERGE-0000000022\n" + - " <-- KTABLE-TOSTREAM-0000000009\n" + - " Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n" + - " --> KSTREAM-MERGE-0000000022\n" + - " <-- KSTREAM-PEEK-0000000020\n" + - " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" + - " --> KSTREAM-FILTER-0000000024\n" + - " <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" + - " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" + - " --> KSTREAM-SINK-0000000023\n" + - " <-- KSTREAM-MERGE-0000000022\n" + - " Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" + - " <-- KSTREAM-FILTER-0000000024\n" + - "\n" + - " Sub-topology: 2\n" + - " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" + - " --> KSTREAM-FLATMAP-0000000012\n" + - " Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n" + - " --> KSTREAM-FILTER-0000000043\n" + - " <-- KSTREAM-SOURCE-0000000011\n" + - " Processor: KSTREAM-FILTER-0000000043 (stores: [])\n" + - " --> KSTREAM-SINK-0000000042\n" + - " <-- KSTREAM-FLATMAP-0000000012\n" + - " Sink: KSTREAM-SINK-0000000042 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n" + - " <-- KSTREAM-FILTER-0000000043\n" + - "\n" + - " Sub-topology: 3\n" + - " Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + - " --> KSTREAM-LEFTJOIN-0000000026\n" + - " Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + - " --> KSTREAM-BRANCH-0000000027\n" + - " <-- KSTREAM-SOURCE-0000000025\n" + - " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" + - " --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n" + - " <-- KSTREAM-LEFTJOIN-0000000026\n" + - " Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n" + - " --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n" + - " <-- KSTREAM-BRANCH-0000000027\n" + - " Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n" + - " --> KSTREAM-MAP-0000000030\n" + - " <-- KSTREAM-BRANCH-0000000027\n" + - " Processor: KSTREAM-FILTER-0000000033 (stores: [])\n" + - " --> KSTREAM-PEEK-0000000034\n" + - " <-- KSTREAM-BRANCHCHILD-0000000029\n" + - " Processor: KSTREAM-MAP-0000000030 (stores: [])\n" + - " --> KSTREAM-PEEK-0000000031\n" + - " <-- KSTREAM-BRANCHCHILD-0000000028\n" + - " Processor: KSTREAM-PEEK-0000000034 (stores: [])\n" + - " --> KSTREAM-MAPVALUES-0000000035\n" + - " <-- KSTREAM-FILTER-0000000033\n" + - " Source: KSTREAM-SOURCE-0000000044 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n" + - " --> KSTREAM-PEEK-0000000013\n" + - " Processor: KSTREAM-MAP-0000000037 (stores: [])\n" + - " --> KSTREAM-SINK-0000000038\n" + - " <-- KSTREAM-BRANCHCHILD-0000000029\n" + - " Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n" + - " --> KSTREAM-SINK-0000000036\n" + - " <-- KSTREAM-PEEK-0000000034\n" + - " Processor: KSTREAM-PEEK-0000000013 (stores: [])\n" + - " --> KSTREAM-AGGREGATE-0000000015\n" + - " <-- KSTREAM-SOURCE-0000000044\n" + - " Processor: KSTREAM-PEEK-0000000031 (stores: [])\n" + - " --> KSTREAM-SINK-0000000032\n" + - " <-- KSTREAM-MAP-0000000030\n" + - " Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + - " --> none\n" + - " <-- KSTREAM-PEEK-0000000013\n" + - " Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n" + - " <-- KSTREAM-PEEK-0000000031\n" + - " Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n" + - " <-- KSTREAM-MAPVALUES-0000000035\n" + - " Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n" + - " <-- KSTREAM-MAP-0000000037\n\n"; + private final String expectedComplexMergeOptimizeTopology = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" + + " --> KSTREAM-TRANSFORM-0000000001\n" + + " Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000040\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" + + " --> KSTREAM-SINK-0000000039\n" + + " <-- KSTREAM-TRANSFORM-0000000001\n" + + " Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" + + " <-- KSTREAM-FILTER-0000000040\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" + + " --> KSTREAM-FLATMAP-0000000012\n" + + " Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000043\n" + + " <-- KSTREAM-SOURCE-0000000011\n" + + " Processor: KSTREAM-FILTER-0000000043 (stores: [])\n" + + " --> KSTREAM-SINK-0000000042\n" + + " <-- KSTREAM-FLATMAP-0000000012\n" + + " Sink: KSTREAM-SINK-0000000042 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n" + + " <-- KSTREAM-FILTER-0000000043\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + + " --> KTABLE-SUPPRESS-0000000007\n" + + " <-- KSTREAM-SOURCE-0000000041\n" + + " Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" + + " --> KSTREAM-PEEK-0000000020\n" + + " Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" + + " --> KTABLE-TOSTREAM-0000000009\n" + + " <-- KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-PEEK-0000000020 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000021\n" + + " <-- KSTREAM-SOURCE-0000000019\n" + + " Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n" + + " --> KSTREAM-FLATMAP-0000000010\n" + + " <-- KTABLE-SUPPRESS-0000000007\n" + + " Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022\n" + + " <-- KTABLE-TOSTREAM-0000000009\n" + + " Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022\n" + + " <-- KSTREAM-PEEK-0000000020\n" + + " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000024\n" + + " <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" + + " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" + + " --> KSTREAM-SINK-0000000023\n" + + " <-- KSTREAM-MERGE-0000000022\n" + + " Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" + + " <-- KSTREAM-FILTER-0000000024\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + + " --> KSTREAM-LEFTJOIN-0000000026\n" + + " Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + + " --> KSTREAM-BRANCH-0000000027\n" + + " <-- KSTREAM-SOURCE-0000000025\n" + + " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" + + " --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n" + + " <-- KSTREAM-LEFTJOIN-0000000026\n" + + " Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n" + + " <-- KSTREAM-BRANCH-0000000027\n" + + " Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n" + + " --> KSTREAM-MAP-0000000030\n" + + " <-- KSTREAM-BRANCH-0000000027\n" + + " Processor: KSTREAM-FILTER-0000000033 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000034\n" + + " <-- KSTREAM-BRANCHCHILD-0000000029\n" + + " Processor: KSTREAM-MAP-0000000030 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000031\n" + + " <-- KSTREAM-BRANCHCHILD-0000000028\n" + + " Processor: KSTREAM-PEEK-0000000034 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000035\n" + + " <-- KSTREAM-FILTER-0000000033\n" + + " Source: KSTREAM-SOURCE-0000000044 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n" + + " --> KSTREAM-PEEK-0000000013\n" + + " Processor: KSTREAM-MAP-0000000037 (stores: [])\n" + + " --> KSTREAM-SINK-0000000038\n" + + " <-- KSTREAM-BRANCHCHILD-0000000029\n" + + " Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n" + + " --> KSTREAM-SINK-0000000036\n" + + " <-- KSTREAM-PEEK-0000000034\n" + + " Processor: KSTREAM-PEEK-0000000013 (stores: [])\n" + + " --> KSTREAM-AGGREGATE-0000000015\n" + + " <-- KSTREAM-SOURCE-0000000044\n" + + " Processor: KSTREAM-PEEK-0000000031 (stores: [])\n" + + " --> KSTREAM-SINK-0000000032\n" + + " <-- KSTREAM-MAP-0000000030\n" + + " Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + + " --> none\n" + + " <-- KSTREAM-PEEK-0000000013\n" + + " Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n" + + " <-- KSTREAM-PEEK-0000000031\n" + + " Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n" + + " <-- KSTREAM-MAPVALUES-0000000035\n" + + " Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n" + + " <-- KSTREAM-MAP-0000000037\n\n"; }