Skip to content

KAFKA-17393: Remove log.message.format.version/message.format.version (KIP-724)#18267

Merged
ijuma merged 8 commits intoapache:trunkfrom
FrankYang0529:KAFKA-17393
Dec 21, 2024
Merged

KAFKA-17393: Remove log.message.format.version/message.format.version (KIP-724)#18267
ijuma merged 8 commits intoapache:trunkfrom
FrankYang0529:KAFKA-17393

Conversation

@FrankYang0529
Copy link
Copy Markdown
Member

@FrankYang0529 FrankYang0529 commented Dec 19, 2024

Based on KIP-724, the log.message.format.version and message.format.version can be removed in 4.0.

These configs effectively a no-op with inter-broker protocol version 3.0 or higher
since Apache Kafka 3.0, so the impact should be minimal.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions Bot added core Kafka Broker tools storage Pull requests that target the storage module clients labels Dec 19, 2024
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17393 branch 3 times, most recently from b542a90 to a673575 Compare December 21, 2024 03:53
@ijuma ijuma changed the title KAFKA-17393: Remove message.format.version and in TopicConfig KAFKA-17393: Remove log.message.format.version and message.format.version Dec 21, 2024
Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. It's looking pretty good. I left a few comments.

// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)
val props = new Properties()
topicConfig.asScala.foreachEntry { (key, value) =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code needed at all now?

@@ -436,7 +375,7 @@ private Optional<Compression> getCompression() {
}

public RecordVersion recordVersion() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove this method? It doesn't make sense now that we don't have a log config for record version.

Comment thread docs/upgrade.html Outdated
</li>
<li>The <code>org.apache.kafka.clients.producer.internals.DefaultPartitioner</code> and <code>org.apache.kafka.clients.producer.UniformStickyPartitioner</code> class was removed.
</li>
<li>The <code>log.message.format.version</code> and <code>message.format.version</code> were removed.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the word configs before were removed.

}

@nowarn("cat=deprecation")
def setIbpAndMessageFormatVersions(config: Properties, version: MetadataVersion): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename this method.

@@ -113,9 +106,7 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove this property and comment? Also, the topic name should be updated.

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17393 branch 3 times, most recently from 9bb186f to 045dd80 Compare December 21, 2024 07:57
Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, just a couple more comments.

MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
magic,
RecordVersion.V2.value,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use RecordBatch.CURRENT_MAGIC_VALUE here and other similar places.

def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localLog(topicPartition).map(_.config)

def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.recordVersion.value)
def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_ => RecordVersion.V2.value)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this method?

Copy link
Copy Markdown
Member Author

@FrankYang0529 FrankYang0529 Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review and suggestion. I try to remove ReplicaManager#getMagic and it looks like some related logic on this path can be removed too. Can I also remove them? Thanks.

public static int estimateSizeInBytes(byte magic,
long baseOffset,
CompressionType compressionType,
Iterable<Record> records) {
int size = 0;
if (magic <= RecordBatch.MAGIC_VALUE_V1) {
for (Record record : records)
size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
} else {
size = DefaultRecordBatch.sizeInBytes(baseOffset, records);
}
return estimateCompressedSizeInBytes(size, compressionType);
}
public static int estimateSizeInBytes(byte magic,
CompressionType compressionType,
Iterable<SimpleRecord> records) {
int size = 0;
if (magic <= RecordBatch.MAGIC_VALUE_V1) {
for (SimpleRecord record : records)
size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
} else {
size = DefaultRecordBatch.sizeInBytes(records);
}
return estimateCompressedSizeInBytes(size, compressionType);
}

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
Compression compression,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit,
long deleteHorizonMs) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
if (isTransactional)
throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
if (isControlBatch)
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compression.type() == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
if (deleteHorizonMs != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Delete horizon timestamp is not supported for magic " + magic);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can leave the changes to the records for a separate PR. One thing we have to be careful about is that old record formats may exist on disk - so the functionality to handle that needs to remain.

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17393 branch 2 times, most recently from 8ff57db to 53cf9f0 Compare December 21, 2024 10:09
Signed-off-by: PoAn Yang <payang@apache.org>
val offsetKey = GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey]
assertEquals(groupId, offsetKey.key.group)
assertEquals("foo", offsetKey.key.topicPartition.topic)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer needed?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added back this code and made the test pass by returning the right partition from replica manager.

@Test
def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
def shouldRespondWithNoErrorsForGoodPartition(): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can delete this test. shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition seems to cover this path already.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a commit that does this.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Dec 21, 2024

I left a couple of comments above and pushed a commit with a few improvements. Outside of the two comments, I think we're good.


groupMetadataManager.cleanupGroupMetadata()

verify(partition).appendRecordsToLeader(any[MemoryRecords],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added this back (and fixed the test).

(removedOffsets, group.is(Dead), group.generationId)
}

val offsetsPartition = partitionFor(groupId)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indenting of this existing code was wrong, fixed it as part of this change.

Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with the latest updates.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Dec 21, 2024

@FrankYang0529 If the tests pass, I'll go ahead and merge. Please take a look when you have a chance and let me know if you see any issues with my updates.

@ijuma ijuma changed the title KAFKA-17393: Remove log.message.format.version and message.format.version KAFKA-17393: Remove log.message.format.version and message.format.version (KIP-724) Dec 21, 2024
@ijuma ijuma changed the title KAFKA-17393: Remove log.message.format.version and message.format.version (KIP-724) KAFKA-17393: Remove log.message.format.version/message.format.version (KIP-724) Dec 21, 2024
@ijuma ijuma merged commit b4be178 into apache:trunk Dec 21, 2024
@FrankYang0529 FrankYang0529 deleted the KAFKA-17393 branch December 22, 2024 02:36
@FrankYang0529
Copy link
Copy Markdown
Member Author

Hi @ijuma, thanks for reviewing and updating the PR. I think the change is good. 👍

ijuma pushed a commit that referenced this pull request Dec 22, 2024
… (KIP-724) (#18267)

Based on [KIP-724](https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1), the `log.message.format.version` and `message.format.version` can be removed in 4.0.

These configs effectively a no-op with inter-broker protocol version 3.0 or higher
since Apache Kafka 3.0, so the impact should be minimal.

Reviewers: Ismael Juma <ismael@juma.me.uk>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
… (KIP-724) (apache#18267)

Based on [KIP-724](https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1), the `log.message.format.version` and `message.format.version` can be removed in 4.0.

These configs effectively a no-op with inter-broker protocol version 3.0 or higher
since Apache Kafka 3.0, so the impact should be minimal.

Reviewers: Ismael Juma <ismael@juma.me.uk>
@@ -223,11 +222,6 @@ public boolean renameDir(String name) {
public void updateConfig(LogConfig newConfig) {
LogConfig oldConfig = config;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this local variable now - maybe it needs a minor to cleanup :)

ijuma added a commit that referenced this pull request Jan 9, 2025
…sion v2 (KIP-724) (#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before #18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
ijuma added a commit that referenced this pull request Jan 9, 2025
…sion v2 (KIP-724) (#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before #18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
m1a2st pushed a commit to m1a2st/kafka that referenced this pull request Jan 10, 2025
…sion v2 (KIP-724) (apache#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before apache#18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
pranavt84 pushed a commit to pranavt84/kafka that referenced this pull request Jan 27, 2025
…sion v2 (KIP-724) (apache#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before apache#18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
chia7712 pushed a commit that referenced this pull request Feb 17, 2025
In PR #18267, we removed old message format for cases in ConsumerWithLegacyMessageFormatIntegrationTest. Although test cases can pass, they don't fulfill original purpose. We can't send old message format since 4.0, so I change cases to append old records by ReplicaManager directly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
chia7712 pushed a commit that referenced this pull request Feb 17, 2025
In PR #18267, we removed old message format for cases in ConsumerWithLegacyMessageFormatIntegrationTest. Although test cases can pass, they don't fulfill original purpose. We can't send old message format since 4.0, so I change cases to append old records by ReplicaManager directly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
…sion v2 (KIP-724) (apache#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before apache#18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
…e#18889)

In PR apache#18267, we removed old message format for cases in ConsumerWithLegacyMessageFormatIntegrationTest. Although test cases can pass, they don't fulfill original purpose. We can't send old message format since 4.0, so I change cases to append old records by ReplicaManager directly.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients core Kafka Broker storage Pull requests that target the storage module tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants