From dfdf63573ba083b6d186a2bd4bb7c2d39a4b6156 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 23 Nov 2020 17:09:15 -0800 Subject: [PATCH 1/3] refactor upDateSourceTopics, fix up test --- .../processor/internals/AbstractTask.java | 4 +-- .../internals/ProcessorTopology.java | 32 +++++++++++-------- .../processor/internals/StreamTask.java | 4 +-- .../streams/processor/internals/Task.java | 2 +- .../processor/internals/TaskManager.java | 2 +- .../RegexSourceIntegrationTest.java | 15 +++++++-- .../processor/internals/TaskManagerTest.java | 2 +- 7 files changed, 38 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 126c7d5a8822e..3f4d6de0f46ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -154,9 +154,9 @@ final void transitionTo(final Task.State newState) { } @Override - public void update(final Set topicPartitions, final Map> nodeToSourceTopics) { + public void update(final Set topicPartitions, final Map> allTopologyNodesToSourceTopics) { this.inputPartitions = topicPartitions; - topology.updateSourceTopics(nodeToSourceTopics); + topology.updateSourceTopics(allTopologyNodesToSourceTopics); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 4ee3c0e04db41..c4821c280436b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() { return false; } - public void updateSourceTopics(final Map> sourceTopicsByName) { - if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) { - log.error("Set of source nodes do not match: \n" + - "sourceNodesByName = {}\n" + - "sourceTopicsByName = {}", - sourceNodesByName.keySet(), sourceTopicsByName.keySet()); - throw new IllegalStateException("Tried to update source topics but source nodes did not match"); - } + public void updateSourceTopics(final Map> allSourceTopicsByNodeName) { sourceNodesByTopic.clear(); - for (final Map.Entry> sourceEntry : sourceTopicsByName.entrySet()) { - final String nodeName = sourceEntry.getKey(); - for (final String topic : sourceEntry.getValue()) { + for (final Map.Entry> sourceNodeEntry : sourceNodesByName.entrySet()) { + final String sourceNodeName = sourceNodeEntry.getKey(); + final SourceNode sourceNode = sourceNodeEntry.getValue(); + + final List updatedSourceTopics = allSourceTopicsByNodeName.get(sourceNodeName); + if (updatedSourceTopics == null) { + log.error("Unable to find source node {} in updated topics map {}", + sourceNodeName, allSourceTopicsByNodeName); + throw new IllegalStateException("Node " + sourceNodeName + " not found in full topology"); + } + + log.trace("Updating source node {} with new topics {}", sourceNodeName, updatedSourceTopics); + for (final String topic : updatedSourceTopics) { if (sourceNodesByTopic.containsKey(topic)) { + log.error("Tried to subscribe topic {} to two nodes when updating topics from {}", + topic, allSourceTopicsByNodeName); throw new IllegalStateException("Topic " + topic + " was already registered to source node " - + sourceNodesByTopic.get(topic).name()); + + sourceNodesByTopic.get(topic).name()); } - sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName)); + sourceNodesByTopic.put(topic, sourceNode); } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 9fc2e2fd50f66..c4aa4b3965311 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -502,8 +502,8 @@ public void closeDirty() { } @Override - public void update(final Set topicPartitions, final Map> nodeToSourceTopics) { - super.update(topicPartitions, nodeToSourceTopics); + public void update(final Set topicPartitions, final Map> allTopologyNodesToSourceTopics) { + super.update(topicPartitions, allTopologyNodesToSourceTopics); partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 948e212f1183e..90aec7f48bb0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -153,7 +153,7 @@ default boolean needsInitializationOrRestoration() { /** * Updates input partitions and topology after rebalance */ - void update(final Set topicPartitions, final Map> nodeToSourceTopics); + void update(final Set topicPartitions, final Map> allTopologyNodesToSourceTopics); /** * Attempt a clean close but do not close the underlying state diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index c4393619413ac..593717b1b97f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -435,7 +435,7 @@ private void cleanUpTaskProducer(final Task task, private void updateInputPartitionsAndResume(final Task task, final Set topicPartitions) { final boolean requiresUpdate = !task.inputPartitions().equals(topicPartitions); if (requiresUpdate) { - log.trace("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions); + log.debug("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions); for (final TopicPartition inputPartition : task.inputPartitions()) { partitionToTask.remove(inputPartition); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 6579f47139fd3..bdfb8aa2a5718 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -55,6 +55,7 @@ import org.junit.rules.TestName; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -67,6 +68,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -189,7 +191,7 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list } @Test - public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { + public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception { final String topic1 = "TEST-TOPIC-1"; final String topic2 = "TEST-TOPIC-2"; @@ -198,9 +200,16 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); - pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + final KStream otherStream = builder.stream(Pattern.compile("not-a-match")); + + pattern1Stream + .selectKey((k, v) -> k) + .groupByKey() + .aggregate(() -> "", (k, v, a) -> v) + .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); + startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(30)); CLUSTER.createTopic(topic2); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 5ec6f0f3bf9a3..93ce84b9e4560 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2944,7 +2944,7 @@ public void closeCleanAndRecycleState() { } @Override - public void update(final Set topicPartitions, final Map> nodeToSourceTopics) { + public void update(final Set topicPartitions, final Map> allTopologyNodesToSourceTopics) { inputPartitions = topicPartitions; } From 46f2a32dd8cc5638df3ca6b8413059df0704ae11 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 24 Nov 2020 10:42:40 -0800 Subject: [PATCH 2/3] review feedback --- .../streams/integration/RegexSourceIntegrationTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index bdfb8aa2a5718..e3999982cfcd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -72,6 +73,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; /** * End-to-end integration test based on using regex and named topics for creating sources, using @@ -208,7 +210,10 @@ public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopol .aggregate(() -> "", (k, v, a) -> v) .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); - streams = new KafkaStreams(builder.build(), streamsConfiguration); + final Topology topology = builder.build(); + assertThat(topology.describe().subtopologies().size(), greaterThan(1)); + streams = new KafkaStreams(topology, streamsConfiguration); + startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(30)); CLUSTER.createTopic(topic2); From 60b95760213142ba1159d79bb1c1b1765d6ecb6b Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 24 Nov 2020 14:12:31 -0800 Subject: [PATCH 3/3] add unit test --- .../internals/ProcessorTopologyTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index f5d3a8acbd3f1..8010dcf65d7a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -59,6 +59,8 @@ import java.util.function.Supplier; import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; @@ -170,6 +172,20 @@ public void shouldUpdateSourceTopicsWithRemovedTopic() { assertNull(processorTopology.source("topic-2")); } + @Test + public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() { + topology.addSource("source-1", "topic-1"); + final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology(); + + processorTopology.updateSourceTopics(mkMap( + mkEntry("source-1", Collections.singletonList("topic-1")), + mkEntry("source-2", Collections.singletonList("topic-2"))) + ); + + assertNull(processorTopology.source("topic-2")); + assertThat(processorTopology.sources().size(), equalTo(1)); + } + @Test public void testDrivingSimpleTopology() { final int partition = 10;