Skip to content

KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name#10952

Merged
hachikuji merged 7 commits intoapache:3.0from
jolshan:KAFKA-12257
Jul 15, 2021
Merged

KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name#10952
hachikuji merged 7 commits intoapache:3.0from
jolshan:KAFKA-12257

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Jul 1, 2021

Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic.

With the addition of topic IDs, when we encounter a new topic ID (recreated topic) we can choose to get the topic's metadata even if the epoch is lower than the deleted topic.

The idea is that when we update from no topic IDs to using topic IDs, we will not count the topic as new (It could be the same topic but with a new ID). We will only take the update if the topic ID changed.

Added tests for this scenario as well as some tests for storing the topic IDs. Also added tests for topic IDs in metadata cache.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

return Optional.ofNullable(metadataByPartition.get(topicPartition));
}

Map<String, Uuid> topicIds() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose the map or could we just have lookup methods:

Uuid topicId(String topicName);
String topicName(Uuid topicId);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I suppose we could have lookup methods. This has implications for the Fetch PR though.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used when getting all the topic IDs to put into the fetch request/session in Fetcher there. Maybe it is ok to call a method multiple times there. I also use it in tests, but maybe we could change that usage.

Comment thread clients/src/main/java/org/apache/kafka/clients/MetadataCache.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
if (id != null)
newTopicIds.put(partition.topic(), id);
else
// Remove if the latest metadata does not have a topic ID
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale to discard topicId information? Is this to deal with downgrades?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for the fetch path, we want to know when topic IDs are removed as quickly as possible so we can switch over to the older fetch version that uses topic names.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still make sense in the context of 3.0, which does not have topicId fetch logic?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it is not needed, but I'm not sure if it helps a lot to remove it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave it as is I guess since I can't think of a strong case to remove it. It is a rare situation that we would hit this case and the consequence of losing the topic ID is probably not too bad. Worst case, we might miss a recreation which occurred while the cluster was rolling to upgrade or downgrade. On the other hand, it could lead to other kinds of problems if we allow updates to the epoch information tied to a topic ID without being able to validate that the topic ID is correct, so maybe this logic is for the best.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. That was my reasoning. I thought the upgrade/downgrade case would be rare and the guarantees harder to reason about there.

Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan , Thanks for the nice and simple fix! Overall LGTM! Left some minor comments. Thanks.

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java
Comment thread clients/src/main/java/org/apache/kafka/clients/MetadataCache.java Outdated
@showuon
Copy link
Copy Markdown
Member

showuon commented Jul 8, 2021

Also, please remember to rebase with the latest trunk branch. Thanks.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jul 8, 2021

Yup. This merge conflict was caused by my other PR 😅

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
log.debug("Topic ID changed, so this topic must have been recreated. " +
"Removing last seen epoch {} for the old partition {} and adding epoch {} from new metadata", currentEpoch, tp, newEpoch);
lastSeenLeaderEpochs.put(tp, newEpoch);
return Optional.of(partitionMetadata);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this comment here for lack of an alternative location. This patch takes a good first step in improving consumer behavior for the topic recreation case. At least we are able to detect and discard the old epoch state. In fact, it does a little more than that since, combined with the fetch validation logic, we are likely to detect that the old fetch position is no longer valid. Most likely this case would get raised to the user as a LogTruncationException, which might not be ideal, but at least is justifiable. However, it doesn't quite close the door on reuse of the fetch position since it may remain valid on the recreated topic. For the full solution, we probably need to track topicId in SubscriptionState as well so that we can force an offset reset whenever the topicId changes. I think it makes sense to do this in https://issues.apache.org/jira/browse/KAFKA-12975.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think the main issue here was that we would ignore metadata updates when we were simply looking at the epoch. I believe that this PR solves the problem, but we can continue to improve beyond this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was just pointing out that there is still a gap.

Comment thread clients/src/main/java/org/apache/kafka/clients/MetadataCache.java Outdated
if (id != null)
newTopicIds.put(partition.topic(), id);
else
// Remove if the latest metadata does not have a topic ID
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still make sense in the context of 3.0, which does not have topicId fetch logic?

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
lastSeenLeaderEpochs.put(tp, newEpoch);
return Optional.of(partitionMetadata);
// If both topic IDs were valid and the topic ID changed, update the metadata
} else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, shouldn't this check come before the epoch check? Admittedly, it's unlikely that a recreated topic would have a higher epoch, but we may as well handle that case.

By the way, it's a little inconsistent that this check uses both null and Uuid.ZERO_UUID to represent a missing value. Maybe we can use null consistently?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bugged me a bit too. The issue is that the request itself uses Uuid.ZERO_UUID, so we'd just have to convert that to null. We can do that if it is clearer to read.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, these checks result in the same thing. The only difference is the log debug line. If it makes more sense to log the topic ID change, I can switch the order.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the logging is what I had in mind. The log message is misleading otherwise.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. I'll switch it.

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@hachikuji hachikuji merged commit 52ca4bc into apache:3.0 Jul 15, 2021
hachikuji pushed a commit that referenced this pull request Nov 15, 2021
…e same name (#10952)

Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic. The benefit of this is that it lets us detect topic recreations. This allows the client to update metadata even if the leader epoch is lower than what was seen previously.

Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants