Skip to content

KAFKA-18784: Fix ConsumerWithLegacyMessageFormatIntegrationTest#18889

Merged
chia7712 merged 1 commit intoapache:trunkfrom
FrankYang0529:KAFKA-18784
Feb 17, 2025
Merged

KAFKA-18784: Fix ConsumerWithLegacyMessageFormatIntegrationTest#18889
chia7712 merged 1 commit intoapache:trunkfrom
FrankYang0529:KAFKA-18784

Conversation

@FrankYang0529
Copy link
Copy Markdown
Member

In PR #18267, we removed old message format for cases in ConsumerWithLegacyMessageFormatIntegrationTest. Although test cases can pass, they don't fulfill original purpose. We can't send old message format since 4.0, so I change cases to append old records by ReplicaManager directly.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions Bot added core Kafka Broker tests Test fixes (including flaky tests) small Small PRs labels Feb 13, 2025
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for this patch. one major comment is left.


records.foreach(builder.append)

brokers.filter(_.config.brokerId == brokerId).foreach(b => b.replicaManager.appendRecords(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicaManager.appendRecords performs the conversion if the magic of the batch is not equal to v2. Therefore, to handle older record formats, you should follow @ijuma's suggestion in the jira to utilize the Log class for writing records in the legacy format.

@dajac dajac added the Blocker This pull request is identified as solving a blocker for a release. label Feb 14, 2025
assertEquals(40, timestampTopic2P0.offset)
assertEquals(40, timestampTopic2P0.timestamp)
assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
assertEquals(Optional.empty, timestampTopic2P0.leaderEpoch)
Copy link
Copy Markdown
Member Author

@FrankYang0529 FrankYang0529 Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit 8cdf1ab included KRaft support for ConsumerWithLegacyMessageFormatIntegrationTest. However, it didn't really set topic to use v0 format. The KRaft used inter broker protocol version 3.0-IV1, so the message format version is ignored. That's why we could get leader epoch for topic2.

Option(topicConfig.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)).flatMap { versionString =>
val messageFormatVersion = new MessageFormatVersion(versionString, kafkaConfig.interBrokerProtocolVersion.version)
if (messageFormatVersion.shouldIgnore) {
if (messageFormatVersion.shouldWarn)
warn(messageFormatVersion.topicWarningMessage(topic))
Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we don't set the leader epoch for the old message format.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for this patch. please take a look at following two minor comments

// append legacy records to topic2
appendLegacyRecords(100, tp, 0, part)
} else {
println("sendRecords")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this line

assertEquals(40, timestampTopic2P0.offset)
assertEquals(40, timestampTopic2P0.timestamp)
assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
assertEquals(Optional.empty, timestampTopic2P0.leaderEpoch)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we don't set the leader epoch for the old message format.

new SimpleRecord(startingTimestamp + i, s"key $i".getBytes, s"value $i".getBytes)
}
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, records.asJava))
val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, Compression.of(CompressionType.NONE).build,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add test for v0 also? in v0 we don't write the timestamp, so searching record by timestamp should return null.

new SimpleRecord(startingTimestamp + i, s"key $i".getBytes, s"value $i".getBytes)
}
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, records.asJava))
val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, Compression.of(CompressionType.NONE).build,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please keep tests for both v0 and v1?

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 chia7712 merged commit 2b6e868 into apache:trunk Feb 17, 2025
chia7712 pushed a commit that referenced this pull request Feb 17, 2025
In PR #18267, we removed old message format for cases in ConsumerWithLegacyMessageFormatIntegrationTest. Although test cases can pass, they don't fulfill original purpose. We can't send old message format since 4.0, so I change cases to append old records by ReplicaManager directly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
@chia7712
Copy link
Copy Markdown
Member

cherry-pick to 4.0

@FrankYang0529 FrankYang0529 deleted the KAFKA-18784 branch February 18, 2025 01:41
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
…e#18889)

In PR apache#18267, we removed old message format for cases in ConsumerWithLegacyMessageFormatIntegrationTest. Although test cases can pass, they don't fulfill original purpose. We can't send old message format since 4.0, so I change cases to append old records by ReplicaManager directly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Blocker This pull request is identified as solving a blocker for a release. core Kafka Broker tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants