Skip to content

KAFKA-15605: Fix topic deletion handling during ZK migration#14545

Merged
mumrah merged 14 commits intoapache:trunkfrom
mumrah:KAFKA-15605-migrate-topic-deletions
Oct 26, 2023
Merged

KAFKA-15605: Fix topic deletion handling during ZK migration#14545
mumrah merged 14 commits intoapache:trunkfrom
mumrah:KAFKA-15605-migrate-topic-deletions

Conversation

@mumrah
Copy link
Copy Markdown
Member

@mumrah mumrah commented Oct 13, 2023

This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.

During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".

To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.

This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since it's introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 14, 2023

Hmm, this isn’t how we agreed to handle this. We were going to get rid of deleting state in hybrid brokers as I recall.

Then we were going to actually verify that broker topic IDs matched what was sent in LeaderAndIsr, and move aside or delete if not.

In the initial full leaderAndIsrRequest, we can use LeaderAndIsrRequest.Type = All to indicate that the list is exhaustive, and move aside topics that don't appear in there

@mumrah mumrah force-pushed the KAFKA-15605-migrate-topic-deletions branch from fb7392f to 139aa8a Compare October 18, 2023 19:20
@mumrah
Copy link
Copy Markdown
Member Author

mumrah commented Oct 18, 2023

Thanks @cmccabe, I do vaguely remember that decision :). I've updated the PR based on the original design.

@mumrah mumrah marked this pull request as ready for review October 18, 2023 19:23
Comment thread core/src/main/scala/kafka/log/LogManager.scala Outdated
// Start a delete, but don't wait for it
admin = zkCluster.createAdminClient()
admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3").asJava)
admin.close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could topic deletion complete before brokers enter migration mode in the test?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all or none of the deletions could happen before the migration starts in the test. I'll try to think of a way to make this more reproducible.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I increase the number of topics to delete as part of this test. That should hopefully increase the likelihood of it being successful. I also added an "Assume" to the test so it will be skipped if somehow all the topics aren't pending deletion by the time the migration starts.

@cmccabe cmccabe changed the title KAFKA-15605 Handle pending topic deletions during migration KAFKA-15605 Fix topic deletion handling during ZK migration Oct 19, 2023
@cmccabe cmccabe changed the title KAFKA-15605 Fix topic deletion handling during ZK migration KAFKA-15605: Fix topic deletion handling during ZK migration Oct 19, 2023
Comment thread clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java Outdated
val topicDeletions = readPendingTopicDeletions().asScala
val topicsToMigrated = allTopics -- topicDeletions
if (topicDeletions.nonEmpty) {
warn(s"Found ${topicDeletions.size} pending topic deletions: $topicDeletions. These will be not migrated " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably limit listing the topics that are being deleted to like the first 10 or whatever. Otherwise we could generate quite a long log4j message.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, i wondered about that. What about logging each deletion separately at TRACE ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's too much effort, just log the #

operationConsumer.accept(
DELETE_PENDING_TOPIC_DELETION,
"Delete pending topic deletions",
migrationState -> migrationClient.topicClient().clearPendingTopicDeletions(pendingTopicDeletions, migrationState)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think this logic to clear pending topic deletions should exist in the code to sync kraft state to zk state rather than here, right? I think the other stuff here just reads and doesn't write (correct?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this is in handleTopicsSnapshot which is sync'ing the TopicImage to ZK. Really it doesn't need to happen each time when we handle a snapshot, but I figured putting it here was better than having additional one-off logic at migration time

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 19, 2023

Thanks for the PR, @mumrah ! A very important fix.

Do we need a test of handling a leaderAndIsrRequest that creates stray partitions, in ReplicaManagerTest?

Comment thread clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java Outdated
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

(but please give the enums ALL_CAPS names before committing) ;)

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 23, 2023

oh, and please remove the thing where we log all deleting topic names on a single line. One simple thing to do instead is log each one on a separate line (this can even be at INFO level.) I just don't want super-duper long log lines.

Copy link
Copy Markdown
Contributor

@yyu1993 yyu1993 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mumrah for the PR! Left a comment on stray log deletion.

}
}
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
if (isStray) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are only renaming the stray log dir and removing it from LogManager. Do we also want to add the logic on delayed stray log deletion? If we want to delete the stray logs immediately (more risky, and it might create conflicts with AK merge), I think we need to add it to the log deletion queue (by calling addLogToBeDeleted(strayLog)).

Right now the log dir will be renamed to "-stray" but it will not be deleted by the broker.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, the desired behavior is to stop tracking the partition but not delete the files. Since migrations are are one-off and inherently risky, I didn't want to take any destructive actions like deleting the logs (immediately or delayed). The stray'd partitions are logged at the INFO level when they are detected, and at WARN on subsequent startups.

This gives give operators the information needed to clean up stray partitions if desired.

I filed https://issues.apache.org/jira/browse/KAFKA-15698 to track automatic clean up of the stray partitions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Makes sense.

@mumrah
Copy link
Copy Markdown
Member Author

mumrah commented Oct 26, 2023

Test failures look unrelated.

image

@mumrah mumrah merged commit 339d255 into apache:trunk Oct 26, 2023
@jolshan
Copy link
Copy Markdown
Member

jolshan commented Oct 27, 2023

Hey -- I think this broke the build for java 8.

[Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-14545/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2554:51: the result type of an implicit conversion must be more specific than Object

@divijvaidya
Copy link
Copy Markdown
Member

divijvaidya commented Nov 6, 2023

cmccabe pushed a commit that referenced this pull request Nov 14, 2023
This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.

During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".

To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.

This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
Conflicts:
- ReplicaManagerTest.scala: fix imports

- ZkMigrationIntegrationTest.scala: handle absence of KIP-919 changes that added a different way to
  fetch the quorum voters config.

- KRaftMigrationDriverTest.java: handle absence of KIP-919 changes that added
  setupDeltaForMigration.
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Nov 22, 2023
…14545)

This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.

During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".

To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.

This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
Conflicts:
- ReplicaManagerTest.scala: fix imports

- ZkMigrationIntegrationTest.scala: handle absence of KIP-919 changes that added a different way to
  fetch the quorum voters config.

- KRaftMigrationDriverTest.java: handle absence of KIP-919 changes that added
  setupDeltaForMigration.
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
…14545)

This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.

During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".

To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.

This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants