Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,13 @@ private void maybeUpdateKeyChangingRepartitionNodeMap() {
final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new HashSet<>();
for (final StreamsGraphNode mergeNode : mergeNodes) {
mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
for (final StreamsGraphNode key : keys) {
final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
if (maybeParentKey != null) {
mergeNodesToKeyChangers.get(mergeNode).add(key);
final Set<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entrySet = keyChangingOperationsToOptimizableRepartitionNodes.entrySet();
for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : entrySet) {
if (mergeNodeHasRepartitionChildren(mergeNode, entry.getValue())) {
final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(entry.getKey()));
if (maybeParentKey != null) {
mergeNodesToKeyChangers.get(mergeNode).add(entry.getKey());
}
}
}
}
Expand All @@ -407,6 +409,11 @@ private void maybeUpdateKeyChangingRepartitionNodeMap() {
}
}

private boolean mergeNodeHasRepartitionChildren(final StreamsGraphNode mergeNode,
final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes) {
return repartitionNodes.stream().allMatch(n -> findParentNodeMatching(n, gn -> gn.parentNodes().contains(mergeNode)) != null);
}

@SuppressWarnings("unchecked")
private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName,
final Serde keySerde,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.Test;

import java.time.Duration;
Expand All @@ -47,6 +55,8 @@
public class StreamsGraphTest {

private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
private Initializer<String> initializer;
private Aggregator<String, String, String> aggregator;

// Test builds topology in succesive manner but only graph node not yet processed written to topology

Expand Down Expand Up @@ -101,6 +111,76 @@ public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
builder.build(properties);
}

@Test
@SuppressWarnings("unchecked")
public void shouldNotThrowNPEWithMergeNodes() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

final StreamsBuilder builder = new StreamsBuilder();
initializer = () -> "";
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
final TransformerSupplier<String, String, KeyValue<String, String>> transformSupplier = () -> new Transformer<String, String, KeyValue<String, String>>() {
@Override
public void init(final ProcessorContext context) {

}

@Override
public KeyValue<String, String> transform(final String key, final String value) {
return KeyValue.pair(key, value);
}

@Override
public void close() {

}
};

final KStream<String, String> retryStream = builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String()))
.transform(transformSupplier)
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(initializer,
aggregator,
Materialized.with(Serdes.String(), Serdes.String()))
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500), Suppressed.BufferConfig.maxBytes(64_000_000)))
.toStream()
.flatMap((k, v) -> new ArrayList<>());

final KTable<String, String> idTable = builder.stream("id-table-topic", Consumed.with(Serdes.String(), Serdes.String()))
.flatMap((k, v) -> new ArrayList<KeyValue<String, String>>())
.peek((subscriptionId, recipientId) -> System.out.println("data " + subscriptionId + " " + recipientId))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(initializer,
aggregator,
Materialized.with(Serdes.String(), Serdes.String()));

final KStream<String, String> joinStream = builder.stream("internal-topic-command", Consumed.with(Serdes.String(), Serdes.String()))
.peek((subscriptionId, command) -> System.out.println("stdoutput"))
.mapValues((k, v) -> v)
.merge(retryStream)
.leftJoin(idTable, (v1, v2) -> v1 + v2,
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));

final KStream<String, String>[] branches = joinStream.branch((k, v) -> v.equals("some-value"), (k, v) -> true);

branches[0].map(KeyValue::pair)
.peek((recipientId, command) -> System.out.println("printing out"))
.to("external-command", Produced.with(Serdes.String(), Serdes.String()));

branches[1].filter((k, v) -> v != null)
.peek((subscriptionId, wrapper) -> System.out.println("Printing output"))
.mapValues((k, v) -> v)
.to("dlq-topic", Produced.with(Serdes.String(), Serdes.String()));

branches[1].map(KeyValue::pair).to("retryTopic", Produced.with(Serdes.String(), Serdes.String()));

final Topology topology = builder.build(properties);
assertEquals(expectedComplexMergeOptimizeTopology, topology.describe().toString());
}

@Test
public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {

Expand Down Expand Up @@ -291,4 +371,110 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) {
" Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" +
" <-- KSTREAM-MERGE-0000000006\n\n";


private final String expectedComplexMergeOptimizeTopology = "Topologies:\n"
+ " Sub-topology: 0\n"
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n"
+ " --> KSTREAM-TRANSFORM-0000000001\n"
+ " Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n"
+ " --> KSTREAM-FILTER-0000000040\n"
+ " <-- KSTREAM-SOURCE-0000000000\n"
+ " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n"
+ " --> KSTREAM-SINK-0000000039\n"
+ " <-- KSTREAM-TRANSFORM-0000000001\n"
+ " Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n"
+ " <-- KSTREAM-FILTER-0000000040\n"
+ "\n"
+ " Sub-topology: 1\n"
+ " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n"
+ " --> KSTREAM-FLATMAP-0000000012\n"
+ " Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n"
+ " --> KSTREAM-FILTER-0000000043\n"
+ " <-- KSTREAM-SOURCE-0000000011\n"
+ " Processor: KSTREAM-FILTER-0000000043 (stores: [])\n"
+ " --> KSTREAM-SINK-0000000042\n"
+ " <-- KSTREAM-FLATMAP-0000000012\n"
+ " Sink: KSTREAM-SINK-0000000042 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n"
+ " <-- KSTREAM-FILTER-0000000043\n"
+ "\n"
+ " Sub-topology: 2\n"
+ " Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n"
+ " --> KSTREAM-AGGREGATE-0000000003\n"
+ " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n"
+ " --> KTABLE-SUPPRESS-0000000007\n"
+ " <-- KSTREAM-SOURCE-0000000041\n"
+ " Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n"
+ " --> KSTREAM-PEEK-0000000020\n"
+ " Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n"
+ " --> KTABLE-TOSTREAM-0000000009\n"
+ " <-- KSTREAM-AGGREGATE-0000000003\n"
+ " Processor: KSTREAM-PEEK-0000000020 (stores: [])\n"
+ " --> KSTREAM-MAPVALUES-0000000021\n"
+ " <-- KSTREAM-SOURCE-0000000019\n"
+ " Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n"
+ " --> KSTREAM-FLATMAP-0000000010\n"
+ " <-- KTABLE-SUPPRESS-0000000007\n"
+ " Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n"
+ " --> KSTREAM-MERGE-0000000022\n"
+ " <-- KTABLE-TOSTREAM-0000000009\n"
+ " Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n"
+ " --> KSTREAM-MERGE-0000000022\n"
+ " <-- KSTREAM-PEEK-0000000020\n"
+ " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n"
+ " --> KSTREAM-FILTER-0000000024\n"
+ " <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n"
+ " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n"
+ " --> KSTREAM-SINK-0000000023\n"
+ " <-- KSTREAM-MERGE-0000000022\n"
+ " Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n"
+ " <-- KSTREAM-FILTER-0000000024\n"
+ "\n"
+ " Sub-topology: 3\n"
+ " Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n"
+ " --> KSTREAM-LEFTJOIN-0000000026\n"
+ " Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n"
+ " --> KSTREAM-BRANCH-0000000027\n"
+ " <-- KSTREAM-SOURCE-0000000025\n"
+ " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n"
+ " --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n"
+ " <-- KSTREAM-LEFTJOIN-0000000026\n"
+ " Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n"
+ " --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n"
+ " <-- KSTREAM-BRANCH-0000000027\n"
+ " Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n"
+ " --> KSTREAM-MAP-0000000030\n"
+ " <-- KSTREAM-BRANCH-0000000027\n"
+ " Processor: KSTREAM-FILTER-0000000033 (stores: [])\n"
+ " --> KSTREAM-PEEK-0000000034\n"
+ " <-- KSTREAM-BRANCHCHILD-0000000029\n"
+ " Processor: KSTREAM-MAP-0000000030 (stores: [])\n"
+ " --> KSTREAM-PEEK-0000000031\n"
+ " <-- KSTREAM-BRANCHCHILD-0000000028\n"
+ " Processor: KSTREAM-PEEK-0000000034 (stores: [])\n"
+ " --> KSTREAM-MAPVALUES-0000000035\n"
+ " <-- KSTREAM-FILTER-0000000033\n"
+ " Source: KSTREAM-SOURCE-0000000044 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n"
+ " --> KSTREAM-PEEK-0000000013\n"
+ " Processor: KSTREAM-MAP-0000000037 (stores: [])\n"
+ " --> KSTREAM-SINK-0000000038\n"
+ " <-- KSTREAM-BRANCHCHILD-0000000029\n"
+ " Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n"
+ " --> KSTREAM-SINK-0000000036\n"
+ " <-- KSTREAM-PEEK-0000000034\n"
+ " Processor: KSTREAM-PEEK-0000000013 (stores: [])\n"
+ " --> KSTREAM-AGGREGATE-0000000015\n"
+ " <-- KSTREAM-SOURCE-0000000044\n"
+ " Processor: KSTREAM-PEEK-0000000031 (stores: [])\n"
+ " --> KSTREAM-SINK-0000000032\n"
+ " <-- KSTREAM-MAP-0000000030\n"
+ " Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n"
+ " --> none\n"
+ " <-- KSTREAM-PEEK-0000000013\n"
+ " Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n"
+ " <-- KSTREAM-PEEK-0000000031\n"
+ " Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n"
+ " <-- KSTREAM-MAPVALUES-0000000035\n"
+ " Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n"
+ " <-- KSTREAM-MAP-0000000037\n\n";

}