Skip to content

KAFKA-13488: Producer fails to recover if topic gets deleted midway#11552

Merged
dajac merged 6 commits intoapache:trunkfrom
prat0318:trunk
Dec 16, 2021
Merged

KAFKA-13488: Producer fails to recover if topic gets deleted midway#11552
dajac merged 6 commits intoapache:trunkfrom
prat0318:trunk

Conversation

@prat0318
Copy link
Copy Markdown
Contributor

Allow LeaderEpoch to be re-assigned to the new value from the Metadata Response if oldTopicId is not present in the cache. This is needed because oldTopicId is removed from the cache if the topic gets deleted but the LeaderEpoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow oldTopicId to be null.

This is a fix on top of earlier made #10952 and #11004 PRs but still don't solve the bug mentioned in KAFKA-13488. This is now fixed in this PR.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! I left a few comments for consideration.

Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
// oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.
if (topicId != null && !topicId.equals(oldTopicId)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach is reasonable. The root of the problem is the loose binding between the leader epoch and the topicId. I'm still a bit tempted to take this a little further and only permit the epoch check when the topicIds are matching. This is a little difficult to do at the moment because the epoch may be learned in contexts where we have not yet exposed the topicId. Hopefully this can be tightened up in https://issues.apache.org/jira/browse/KAFKA-13447. For now, this workaround seems ok.

int newEpoch = partitionMetadata.leaderEpoch.get();
Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
// oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

// Between the time that a topic is deleted and re-created, the client may lose
// track of the corresponding topicId (i.e. `oldTopicid` will be null). In this case, 
// when we discover the new topicId, we allow the corresponding leader epoch
// to override the last seen value.

Also, would it make sense to move this into the corresponding branch that it applies to?

Copy link
Copy Markdown
Contributor Author

@prat0318 prat0318 Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Changed the comment as per the suggestion.

would it make sense to move this into the corresponding branch that it applies to?

Sorry, couldn't get it. Can you elaborate on this please. (Do you mean a separate if branch? The current If branch deals with separate topicId, so that should be the one we should modify as part of this patch.)

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala Outdated
// We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
// Don't update to an older one
metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
// If the Older topic Id is null, we should go with the new TopicId as the leader Epoch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worthwhile having a separate case which goes through the sequence described in the jira. Basically this:

  1. Receive metadata response with topicID A.
  2. Receive metadata response with UNKNOWN_TOPIC error.
  3. Receive metadata response with topicID B.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a new testcase with the suggested flow. Old test-case still needs to be changed as that case fails now, so modified the case as per the new changes.

Copy link
Copy Markdown
Member

@dajac dajac Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Older -> older; topic Id -> topic ID; TopicId -> topic ID; Epoch -> epoch.

@prat0318 prat0318 requested a review from hachikuji December 1, 2021 06:19
@prat0318
Copy link
Copy Markdown
Contributor Author

prat0318 commented Dec 3, 2021

@hachikuji Made the suggested changes. Please review.

@prat0318
Copy link
Copy Markdown
Contributor Author

prat0318 commented Dec 6, 2021

@hachikuji Gentle bump on the review. PTAL.

@prat0318
Copy link
Copy Markdown
Contributor Author

@hachikuji @jolshan Bump on the review.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prat0318 Thanks for the PR! I do agree with @hachikuji that the workaround looks reasonable. I have left some comments below (mainly nits).

Comment thread clients/src/main/java/org/apache/kafka/clients/Metadata.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/MetadataTest.java Outdated
Comment thread core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala Outdated
* See the License for the specific language governing permissions and
* limitations under the License.
*/

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Empty line could be removed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

Comment thread core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala Outdated
Comment thread core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala Outdated
Comment thread core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala Outdated
@prat0318 prat0318 requested a review from dajac December 15, 2021 18:57
MetadataResponse metadataResponse = emptyMetadataResponse();
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);

Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could have kept the comment before the block to be consistent with the other two blocks. I just wanted to rework it a bit.

metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));

// Create topic-1 again but this time with a topic ID bar. LeaderEpoch should be updated to new even if lower.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Create topic-1 again but this time with a topic ID.?

new TopicPartition(topic, 1) -> Seq(1, 0)
)

// Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Epoch -> epoch.

@dajac
Copy link
Copy Markdown
Member

dajac commented Dec 16, 2021

@prat0318 Thanks for the update. I left a few more nits to fix typos. Would you have time to quickly address them?

@prat0318 prat0318 requested a review from dajac December 16, 2021 09:32
@prat0318
Copy link
Copy Markdown
Contributor Author

@prat0318 Thanks for the update. I left a few more nits to fix typos. Would you have time to quickly address them?

@dajac Thanks again for the review. I have addressed the suggestions. Please re-review.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch!

@dajac dajac merged commit f653cb7 into apache:trunk Dec 16, 2021
dajac pushed a commit that referenced this pull request Dec 16, 2021
…11552)

Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
dajac pushed a commit that referenced this pull request Dec 16, 2021
…11552)

Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
dajac pushed a commit that referenced this pull request Dec 16, 2021
…11552)

Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
@dajac
Copy link
Copy Markdown
Member

dajac commented Dec 16, 2021

Merged to trunk, 3.1, 3.0 and 2.8.

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…pache#11552)

Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…pache#11552)

Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants