Skip to content

KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)#11004

Merged
hachikuji merged 10 commits intoapache:trunkfrom
jolshan:KAFKA-12257-trunk
Nov 17, 2021
Merged

KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)#11004
hachikuji merged 10 commits intoapache:trunkfrom
jolshan:KAFKA-12257-trunk

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Jul 8, 2021

Trunk version of #10952

This PR slightly cleans up some of the changes made in #9944

Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic.

With the addition of topic IDs, when we encounter a new topic ID (recreated topic) we can choose to get the topic's metadata even if the epoch is lower than the deleted topic.

The idea is that when we update from no topic IDs to using topic IDs, we will not count the topic as new (It could be the same topic but with a new ID). We will only take the update if the topic ID changed.

Added tests for this scenario as well as some tests for storing the topic IDs. Also added tests for topic IDs in metadata cache.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

}

builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset,
Uuid topicId = metadata.topicId(partition.topic());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One change we are making for this PR is to just get the topic ID for a single provided topic name. I want to double check that the metadata (and underlying map) can not change when adding these partitions to the builder since the builder assumes IDs do not change.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding -- we won't update the metadata during this method, correct? Or is there something like another thread that could update it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be updated in a separate thread. I cannot see how that would be a problem though. We do have synchronization in Metadata.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have been a problem before KAFKA-13111 when we assumed only one topic ID per build for a given topic name (we had a mapping), but maybe it is ok now that we store the ID in the data and use it to build the request.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies. I was being a bit slow here. I had not considered the possibility of the id of a given topic changing while we were building the fetch request. I had forgotten that the fetch builder logic does allow the same topic to be included multiple times. It do agree that it is probably better to not allow this. So reverting this change makes sense.

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Nov 11, 2021

I pushed some of the changes that I missed from the 3.0 branch. We'll see how the build goes. Tests seemed to look ok for me locally.

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
}

builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset,
Uuid topicId = metadata.topicId(partition.topic());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be updated in a separate thread. I cannot see how that would be a problem though. We do have synchronization in Metadata.

Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/MetadataCache.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
* @return the topic ID for the given topic name or null if the ID does not exist or is not known
* @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache
*/
public synchronized Uuid topicId(String topicName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any harm keeping this one? Seems like it simplified some of the uses, especially in tests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm...this is the version that could have a new cache value? Only thing I might worry about is misuse.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@hachikuji hachikuji merged commit 06dfa54 into apache:trunk Nov 17, 2021
hachikuji pushed a commit that referenced this pull request Nov 17, 2021
…e same name (#11004)

Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic. The benefit of this is that it lets us detect topic recreations. This allows the client to update metadata even if the leader epoch is lower than what was seen previously.

Reviewers: Jason Gustafson <jason@confluent.io>
hgeraldino pushed a commit to hgeraldino/kafka that referenced this pull request Nov 29, 2021
…e same name (apache#11004)

Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic. The benefit of this is that it lets us detect topic recreations. This allows the client to update metadata even if the leader epoch is lower than what was seen previously.

Reviewers: Jason Gustafson <jason@confluent.io>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…e same name (apache#11004)

Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic. The benefit of this is that it lets us detect topic recreations. This allows the client to update metadata even if the leader epoch is lower than what was seen previously.

Reviewers: Jason Gustafson <jason@confluent.io>
msn-tldr added a commit to msn-tldr/kafka that referenced this pull request Nov 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants