Skip to content

KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager#10282

Merged
hachikuji merged 12 commits intoapache:trunkfrom
jolshan:KAFKA-12426
Apr 1, 2021
Merged

KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager#10282
hachikuji merged 12 commits intoapache:trunkfrom
jolshan:KAFKA-12426

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Mar 8, 2021

KIP-516 introduced partition.metadata file to persist the topic ID on the broker. It is created through handling the LeaderAndIsrRequest in ReplicaManager. (See #10143 for the code path.) RaftReplicaManager was missing the analogue code path for Kip-500 code. Like in ReplicaManager, RaftReplicaManager will now check the partition.metadata file when handling metadata records.

However, since we know that all raft topics will have topic IDs, we can simply set the ID in the log upon the log's creation.
Updated the ReplicaManager path to do the same on newly created topics.

There are some tweaks to the checking logic to better handle the scenario when the log exists but is not yet associated to Partition (for example, upon startup after a shutdown).

Tests added to ensure the file is created and that the correct error is thrown when the id is inconsistent.
Added tests for creating the log with the new topic ID parameter.

Also adds a few methods to get topic ID from MetadataImageBuilder as this is the most convenient way to get topic ID from RaftReplicaManager.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@rondagostino rondagostino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I had mostly stylistic comments.

Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaConfig.scala Outdated
Comment thread core/src/main/scala/kafka/server/RaftReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
…pass as well as reading from the log on the first pass.
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
for (TopicPartition topicPartition : topicPartitions) {
final Partition partition = this.replicaManager.createPartition(topicPartition);
partition.createLogIfNotExists(true, false, checkpoints);
partition.createLogIfNotExists(true, false, checkpoints, Option.empty());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just not set the topic ID here, but want to confirm.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no harm either way, but why not set one anyway since that will be the default behavior going forward?

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated

if (!partition.checkOrSetTopicId(requestTopicId)) {
val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but why is it necessary to set the topicId here? I was expecting that we would do this in makeLeader and makeFollower. I'm a tad uncomfortable exposing the Log object through Partition before we have directly associated it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the case of the log already existing without a topic ID. Before I had this, the partition.metadata file was not written on the first LeaderAndIsr request.

Copy link
Copy Markdown
Member Author

@jolshan jolshan Mar 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also in the case of restarting the broker, as the log is not yet associated to partition.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose one option is to directly associate through Partition.createLogIfNotExists when we get or create the partition a few lines earlier. We call this in makeLeaders/makeFollowers as well, so it's no extra work.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I guess that makes the log creation in makeLeaders/makeFollowers superfluous. Another option is to add the partition metadata file in another place when we know we have the log associated. We know the log gets associated in partition.createLogIfNotExists so maybe there? We check if we have an ID sent to this request + if we don't yet have a topic ID in the log. and assign there? Unfortunately, this is in the raft code path too, but it shouldn't be used since we always already assign topic ID.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking we would update the topicId in createLogIfNotExists.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Mar 17, 2021

I cannot think of a reason not to pass the topic id if we have it available. Otherwise, we would need logic to set it again after the log dir is swapped.

@hachikuji this method is only called for zk controllers so it should be able to set it. but I will add the topic ID.

@hachikuji
Copy link
Copy Markdown
Contributor

@jolshan Yeah, we haven't gotten to implementing JBOD for KIP-500. It shouldn't be too far off though.

Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
topicId = partitionMetadataFile.read().topicId
else {
val fileTopicId = partitionMetadataFile.read().topicId
if (topicId.isDefined && fileTopicId != topicId.get)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to do something like !topicId.contains(fileTopicId)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like if the option is None, this will return true, so I'll do topicId.isDefined && !topicId.contains(fileTopicId)

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
Comment thread core/src/main/scala/kafka/server/RaftReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/RaftReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/cluster/Partition.scala Outdated
Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")
topicId = Some(fileTopicId)
}
} else if (topicId.isDefined && keepPartitionMetadataFile) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I get it.

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
highWatermarkCheckpoints: OffsetCheckpoints,
metadataOffset: Option[Long]): Set[Partition] = {
metadataOffset: Option[Long],
topicIds: String => Option[Uuid]): Set[Partition] = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could use a strong type here since topicIds are required for KIP-500, but maybe not worth it since we are delegating to Partition in the end.

Comment thread core/src/main/scala/kafka/server/RaftReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/log/LogManager.scala
assertEquals(topicId, partition2.topicId.get)
assertFalse(partition2.log.isDefined)

// Calling makeLeader with a new topic ID should not overwrite the old topic ID. We should get the same log.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have test cases for the InconsistentTopicId scenario as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inconsistentTopicId scenario is tested in ReplicaManagerTest/RaftReplicaManagerTest, since the inconsistent ID check logic is in the ReplicaManager, rather than in Partition.

for (TopicPartition topicPartition : topicPartitions) {
final Partition partition = this.replicaManager.createPartition(topicPartition);
partition.createLogIfNotExists(true, false, checkpoints);
partition.createLogIfNotExists(true, false, checkpoints, Option.empty());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no harm either way, but why not set one anyway since that will be the default behavior going forward?

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
…ptional, added check for inconsistent topic IDs in getOrCreateLog
Comment thread core/src/main/scala/kafka/log/LogManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@hachikuji hachikuji added the kraft label Apr 1, 2021
@hachikuji hachikuji merged commit 40f001c into apache:trunk Apr 1, 2021
ijuma added a commit to ijuma/kafka that referenced this pull request Apr 4, 2021
…e-allocations-lz4

* apache-github/trunk: (243 commits)
  KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (apache#10450)
  KAFKA-3968: fsync the parent directory of a segment file when the file is created (apache#10405)
  KAFKA-12283: disable flaky testMultipleWorkersRejoining to stabilize build (apache#10408)
  MINOR: remove KTable.to from the docs (apache#10464)
  MONOR: Remove redudant LocalLogManager (apache#10325)
  MINOR: support ImplicitLinkedHashCollection#sort (apache#10456)
  KAFKA-12587 Remove KafkaPrincipal#fromString for 3.0 (apache#10447)
  KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager (apache#10282)
  MINOR: Improve reproducability of raft simulation tests (apache#10422)
  KAFKA-12474: Handle failure to write new session keys gracefully (apache#10396)
  KAFKA-12593: Fix Apache License headers (apache#10452)
  MINOR: Fix typo in MirrorMaker v2 documentation (apache#10433)
  KAFKA-12600: Remove deprecated config value `default` for client config `client.dns.lookup` (apache#10458)
  KAFKA-12952: Remove deprecated LogConfig.Compact (apache#10451)
  Initial commit (apache#10454)
  KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (apache#10430)
  KAFKA-8405; Remove deprecated `kafka-preferred-replica-election` command (apache#10443)
  MINOR: Fix docs for end-to-end record latency metrics (apache#10449)
  MINOR Replaced File with Path in LogSegmentData. (apache#10424)
  KAFKA-12583: Upgrade netty to 4.1.62.Final
  ...
wyuka pushed a commit to wyuka/kafka that referenced this pull request Dec 15, 2021
…ReplicaManager (apache#10282)

KIP-516 introduced partition.metadata file to persist the topic ID on the broker. It is created through handling the LeaderAndIsrRequest in ReplicaManager. (See apache#10143 for the code path.) RaftReplicaManager was missing the analogue code path for Kip-500 code. Like in ReplicaManager, RaftReplicaManager will now check the partition.metadata file when handling metadata records.

However, since we know that all raft topics will have topic IDs, we can simply set the ID in the log upon the log's creation.
Updated the ReplicaManager path to do the same on newly created topics.

There are also some tweaks to the checking logic to better handle the scenario when the log exists but is not yet associated to Partition (for example, upon startup after a shutdown).

Tests added to ensure the file is created and that the correct error is thrown when the id is inconsistent.
Added tests for creating the log with the new topic ID parameter.

Also adds a few methods to get topic ID from MetadataImageBuilder as this is the most convenient way to get topic ID from RaftReplicaManager.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants