Skip to content

KAFKA-13093: Log compaction should write new segments with record version v2 (KIP-724)#18321

Merged
ijuma merged 20 commits intoapache:trunkfrom
ijuma:kafka-13093-log-compaction-write-record-v2
Jan 9, 2025
Merged

KAFKA-13093: Log compaction should write new segments with record version v2 (KIP-724)#18321
ijuma merged 20 commits intoapache:trunkfrom
ijuma:kafka-13093-log-compaction-write-record-v2

Conversation

@ijuma
Copy link
Copy Markdown
Member

@ijuma ijuma commented Dec 26, 2024

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

  1. V0 compressed record batch with multiple records is converted into single V2 record batch
  2. V0 uncompressed records are converted into single record V2 record batches
  3. V0 records are converted to V2 records with timestampType set to CreateTime and the
    timestamp is -1.
  4. The KAFKA-4298 workaround is no longer needed since the conversion to V2 fixes
    the issue too.
  5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
    supported.
  6. Added back the ability to append records with v0/v1 (for testing only).
  7. The creation of the leader epoch cache is no longer optional since the record version
    config is effectively always V2.

Add integration tests, these tests existed before #18267 - restored, modified and
extended them.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker consumer clients labels Dec 26, 2024
@ijuma ijuma removed the triage PRs from the community label Dec 26, 2024
@ijuma ijuma force-pushed the kafka-13093-log-compaction-write-record-v2 branch from f38e108 to d41a6e2 Compare December 27, 2024 18:50
@github-actions github-actions Bot added the storage Pull requests that target the storage module label Dec 27, 2024
}

@Test
def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was originally removed via #18267 - added it back and modified it to take into account that the leader epoch cache always exists now (since the configured record version is always V2).

@ijuma ijuma requested a review from junrao December 29, 2024 17:23
def listProducerSnapshotOffsets(logDir: File): Seq[Long] =
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted.toSeq

def assertLeaderEpochCacheEmpty(log: UnifiedLog): Unit = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused.


@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): Unit = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests added in this class were originally removed via #18267 - added them back, modified them to fit the new reality and extended them to include more extensive verification of the log after compaction.

@ijuma ijuma force-pushed the kafka-13093-log-compaction-write-record-v2 branch from 0ede54e to a50bd5d Compare December 29, 2024 17:43
@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Dec 29, 2024

@junrao Would you be able to review this PR? I would like to include it in Apache Kafka 4.0, if possible - it completes KIP-724.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : Thanks for the PR. Left a few comments.

Comment thread core/src/main/scala/kafka/log/UnifiedLog.scala Outdated
Comment thread core/src/main/scala/kafka/log/UnifiedLog.scala
Comment thread core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala Outdated
ijuma added 2 commits January 7, 2025 19:38
…og-compaction-write-record-v2

* apache-github/trunk: (34 commits)
  MINOR: Bump year to 2025 in NOTICE file (apache#18427)
  KAFKA-18411 Remove ZkProducerIdManager (apache#18413)
  KAFKA-18408 tweak the 'tag' field for BrokerHeartbeatRequest.json, BrokerRegistrationChangeRecord.json and RegisterBrokerRecord.json (apache#18421)
  KAFKA-18414 Remove KRaftRegistrationResult (apache#18401)
  KAFKA-17921 Support SASL_PLAINTEXT protocol with java.security.auth.login.config (apache#17671)
  KAFKA-18384 Remove ZkAlterPartitionManager (apache#18364)
  KAFKA-10790: Add deadlock detection to producer#flush (apache#17946)
  KAFKA-18412: Remove EmbeddedZookeeper (apache#18399)
  MINOR : Improve Exception log in NotEnoughReplicasException(apache#12394)
  MINOR: Improve PlaintextAdminIntegrationTest#testConsumerGroups (apache#18409)
  MINOR: Remove unused local variable (apache#18410)
  MINOR: Remove RaftManager.maybeDeleteMetadataLogDir and AutoTopicCreationManagerTest.scala (apache#17365)
  KAFKA-18368 Remove TestUtils#MockZkConnect and remove zkConnect from TestUtils#createBrokerConfig (apache#18352)
  MINOR: Update Consumer group timeout default to 30 sec (apache#16406)
  MINOR: Fix typo in CommitRequestManager (apache#18407)
  MINOR: cleanup JavaDocs for deprecation warnings (apache#18402)
  KAFKA-18303; Update ShareCoordinator to use new record format (apache#18396)
  MINOR: Update Consumer and Producer JavaDocs for committing offsets (apache#18336)
  KAFKA-16446: Improve controller event duration logging (apache#15622)
  KAFKA-18388 test-kraft-server-start.sh should use log4j2.yaml (apache#18370)
  ...
@github-actions github-actions Bot added the tiered-storage Related to the Tiered Storage feature label Jan 8, 2025
}

@Test
def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao do you agree that we don't need this test anymore? Or is there a purpose that is not made clear by the test name?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is still useful. We can just leave partition.futureLog.get.leaderEpochCache unchanged. It will be initialized as empty and the fetcher thread will default to initializing the fetch offset with HWM. We probably want to adjust the test name accordingly.


if (logOptional.isPresent()) {
Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache();
if (leaderEpochCache != null && leaderEpochCache.isDefined()) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao It's odd that we were checking for null here - do you know if we can remove this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can remove this check.

leaderEpochCache.foreach(_.clear())
// `renameDir` with `shouldReinitialize=false` sets this to `null` and it's usually (but not always) called before this method
if (leaderEpochCache != null)
leaderEpochCache.clear()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao This is interesting - do you think we should be setting the cache to null here too versus calling clear on it? It's a bit odd that we take different actions to achieve a similar outcome (clearing the in-memory state) in two different (but related) methods.

Copy link
Copy Markdown
Member Author

@ijuma ijuma Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case where we don't call renameDir before delete is during LogManager.loadLog.

if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
      addLogToBeDeleted(log)
    } 

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that we can just call clear on leaderEpochCache instead of setting it to null in renameDir.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : Thanks for the updated PR. A few more minor comments.

private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, logIdent, leaderEpochCache, scheduler)
leaderEpochCache = UnifiedLog.createLeaderEpochCache(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. initializeLeaderEpochCache is better named as reinitializeLeaderEpochCache.

leaderEpochCache.foreach(_.clear())
// `renameDir` with `shouldReinitialize=false` sets this to `null` and it's usually (but not always) called before this method
if (leaderEpochCache != null)
leaderEpochCache.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that we can just call clear on leaderEpochCache instead of setting it to null in renameDir.


if (logOptional.isPresent()) {
Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache();
if (leaderEpochCache != null && leaderEpochCache.isDefined()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can remove this check.

}

@Test
def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is still useful. We can just leave partition.futureLog.get.leaderEpochCache unchanged. It will be initialized as empty and the fetcher thread will default to initializing the fetch offset with HWM. We probably want to adjust the test name accordingly.

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Jan 9, 2025

@junrao Addressed your comments and the tests are passing.

assertTrue(log.partitionMetadataFile.isEmpty)
assertEquals(0, log.logEndOffset)
// verify that records appending can still succeed
// even with the uninitialized leaderEpochCache and partitionMetadataFile
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support appending with the uninitialized leaderEpochCache anymore - it was questionable to support this after renameDir(reinitialize=false) since the former is invoked as part of the deletion of the log.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : Thanks for the updated PR. LGTM.

Should we also remove the following methods in MetadataVersion?
highestSupportedRecordVersion
isOffsetForLeaderEpochSupported
isSaslInterBrokerHandshakeRequestEnabled

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Jan 9, 2025

@junrao Yes, good idea. Is it ok if I do that as a follow-up? It doesn't have to go into 4.0 and there are many references.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : We can do the followup in a separate PR. This PR LGTM.

@ijuma ijuma merged commit cf7029c into apache:trunk Jan 9, 2025
@ijuma ijuma deleted the kafka-13093-log-compaction-write-record-v2 branch January 9, 2025 17:37
ijuma added a commit that referenced this pull request Jan 9, 2025
…sion v2 (KIP-724) (#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before #18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Jan 10, 2025

@junrao Submitted the follow-up PR: #18468

m1a2st pushed a commit to m1a2st/kafka that referenced this pull request Jan 10, 2025
…sion v2 (KIP-724) (apache#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before apache#18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
pranavt84 pushed a commit to pranavt84/kafka that referenced this pull request Jan 27, 2025
…sion v2 (KIP-724) (apache#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before apache#18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
…sion v2 (KIP-724) (apache#18321)

Convert v0/v1 record batches to v2 during compaction even if said record batches would be
written with no change otherwise. A few important details:

1. V0 compressed record batch with multiple records is converted into single V2 record batch
2. V0 uncompressed records are converted into single record V2 record batches
3. V0 records are converted to V2 records with timestampType set to `CreateTime` and the
timestamp is `-1`.
4. The `KAFKA-4298` workaround is no longer needed since the conversion to V2 fixes
the issue too.
5. Removed a log warning applicable to consumers older than 0.10.1 - they are no longer
supported.
6. Added back the ability to append records with v0/v1 (for testing only).
7. The creation of the leader epoch cache is no longer optional since the record version
config is effectively always V2.

Add integration tests, these tests existed before apache#18267 - restored, modified and
extended them.

Reviewers: Jun Rao <jun@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients consumer core Kafka Broker storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants