Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ final void transitionTo(final Task.State newState) {
}

@Override
public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the root cause of this bug was basically just confusion over what exactly this set contains, a renaming feels in order

public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
this.inputPartitions = topicPartitions;
topology.updateSourceTopics(nodeToSourceTopics);
topology.updateSourceTopics(allTopologyNodesToSourceTopics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
return false;
}

public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
log.error("Set of source nodes do not match: \n" +
"sourceNodesByName = {}\n" +
"sourceTopicsByName = {}",
sourceNodesByName.keySet(), sourceTopicsByName.keySet());
throw new IllegalStateException("Tried to update source topics but source nodes did not match");
}
public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {
Comment thread
ableegoldman marked this conversation as resolved.
sourceNodesByTopic.clear();
for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) {
final String nodeName = sourceEntry.getKey();
for (final String topic : sourceEntry.getValue()) {
for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to removing the fault check, I slightly refactored this loop so that we only loop over the source nodes in this particular subtopology. Previously we would have added entries for all source nodes across the entire topology to our sourceNodesByTopic map

final String sourceNodeName = sourceNodeEntry.getKey();
final SourceNode<?, ?, ?, ?> sourceNode = sourceNodeEntry.getValue();

final List<String> updatedSourceTopics = allSourceTopicsByNodeName.get(sourceNodeName);
if (updatedSourceTopics == null) {
log.error("Unable to find source node {} in updated topics map {}",
sourceNodeName, allSourceTopicsByNodeName);
throw new IllegalStateException("Node " + sourceNodeName + " not found in full topology");
}

log.trace("Updating source node {} with new topics {}", sourceNodeName, updatedSourceTopics);
for (final String topic : updatedSourceTopics) {
if (sourceNodesByTopic.containsKey(topic)) {
log.error("Tried to subscribe topic {} to two nodes when updating topics from {}",
topic, allSourceTopicsByNodeName);
throw new IllegalStateException("Topic " + topic + " was already registered to source node "
+ sourceNodesByTopic.get(topic).name());
+ sourceNodesByTopic.get(topic).name());
}
sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName));
sourceNodesByTopic.put(topic, sourceNode);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ public void closeDirty() {
}

@Override
public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
super.update(topicPartitions, nodeToSourceTopics);
public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
super.update(topicPartitions, allTopologyNodesToSourceTopics);
partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ default boolean needsInitializationOrRestoration() {
/**
* Updates input partitions and topology after rebalance
*/
void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics);

/**
* Attempt a clean close but do not close the underlying state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ private void cleanUpTaskProducer(final Task task,
private void updateInputPartitionsAndResume(final Task task, final Set<TopicPartition> topicPartitions) {
final boolean requiresUpdate = !task.inputPartitions().equals(topicPartitions);
if (requiresUpdate) {
log.trace("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
log.debug("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
for (final TopicPartition inputPartition : task.inputPartitions()) {
partitionToTask.remove(inputPartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
Expand All @@ -55,6 +56,7 @@
import org.junit.rules.TestName;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -67,9 +69,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;

/**
* End-to-end integration test based on using regex and named topics for creating sources, using
Expand Down Expand Up @@ -189,7 +193,7 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list
}

@Test
public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception {
final String topic1 = "TEST-TOPIC-1";
final String topic2 = "TEST-TOPIC-2";

Expand All @@ -198,9 +202,19 @@ public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {

final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't catch the bug in this test for two reasons: it has only one subtopology, and it didn't wait for Streams to get to RUNNING before it created the new topic. So we weren't even covering the "update source topics" code path since all topics existed by the first assignment


pattern1Stream
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it's sufficient to just add the second KStream above for a multi-subtopology application, but I felt the test coverage could only stand to benefit with (slightly) more complicated examples

.selectKey((k, v) -> k)
.groupByKey()
.aggregate(() -> "", (k, v, a) -> v)
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would even add otherStream like:

Suggested change
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
.toStream()
.merge(otherStream)
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we merge then won't that merge the subtopologies as well? (We would still have two subtopologies due to the upstream key-changing operation/repartition, but I wanted the test to cover different "kinds" of subtopologies like this)

Copy link
Copy Markdown
Member

@cadonna cadonna Nov 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would have a source node in one sub-topology an a source node in the other sub-topology. I thought that was the pattern in the bug report, but I now realized that the bug report uses a pattern similar to the one you specified. Wouldn't it make sense to test both?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two ways to have more than one source node: either reading from a different input topic/pattern, or via a repartition. I agree that we should test both of these cases, but I'd prefer to do so in a single integration test rather than in two separate integration tests, to avoid making the test suite even longer


final Topology topology = builder.build();
assertThat(topology.describe().subtopologies().size(), greaterThan(1));
streams = new KafkaStreams(topology, streamsConfiguration);

startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(30));

CLUSTER.createTopic(topic2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.function.Supplier;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -170,6 +172,20 @@ public void shouldUpdateSourceTopicsWithRemovedTopic() {
assertNull(processorTopology.source("topic-2"));
}

@Test
public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
topology.addSource("source-1", "topic-1");
final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();

processorTopology.updateSourceTopics(mkMap(
mkEntry("source-1", Collections.singletonList("topic-1")),
mkEntry("source-2", Collections.singletonList("topic-2")))
);

assertNull(processorTopology.source("topic-2"));
assertThat(processorTopology.sources().size(), equalTo(1));
}

@Test
public void testDrivingSimpleTopology() {
final int partition = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2944,7 +2944,7 @@ public void closeCleanAndRecycleState() {
}

@Override
public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
inputPartitions = topicPartitions;
}

Expand Down