From 73451389e36a0dbcb5e1cad77e08812b318c91fe Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Sun, 19 May 2024 22:30:20 +0900 Subject: [PATCH 01/11] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption --- .../src/main/scala/kafka/log/UnifiedLog.scala | 37 ++++----- .../checkpoints/OffsetCheckpointFile.scala | 2 +- .../log/remote/RemoteLogManagerTest.java | 55 +++++++------- .../kafka/cluster/PartitionLockTest.scala | 3 +- .../unit/kafka/cluster/PartitionTest.scala | 3 +- .../kafka/log/LogCleanerManagerTest.scala | 3 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 5 +- .../scala/unit/kafka/log/LogLoaderTest.scala | 21 +++-- .../scala/unit/kafka/log/LogSegmentTest.scala | 5 +- .../kafka/server/ReplicaManagerTest.scala | 5 +- ...CheckpointFileWithFailureHandlerTest.scala | 2 +- .../epoch/LeaderEpochFileCacheTest.scala | 11 +-- .../unit/kafka/utils/SchedulerTest.scala | 3 +- .../kafka/server/common/CheckpointFile.java | 8 +- .../CheckpointFileWithFailureHandler.java | 4 +- .../InMemoryLeaderEpochCheckpoint.java | 2 +- .../checkpoint/LeaderEpochCheckpoint.java | 8 +- .../checkpoint/LeaderEpochCheckpointFile.java | 6 +- .../internals/epoch/LeaderEpochFileCache.java | 76 ++++++++++++++++--- 19 files changed, 161 insertions(+), 98 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index eb6fc517a3dc1..21c0ee6c2a5c1 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -521,7 +521,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private def initializeLeaderEpochCache(): Unit = lock synchronized { - leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent) + leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + dir, topicPartition, logDirFailureChannel, recordVersion, logIdent, leaderEpochCache, scheduler) } private def updateHighWatermarkWithLogEndOffset(): Unit = { @@ -2003,12 +2004,17 @@ object UnifiedLog extends Logging { Files.createDirectories(dir.toPath) val topicPartition = UnifiedLog.parseTopicPartitionName(dir) val segments = new LogSegments(topicPartition) + // The created leaderEpochCache will be truncated by LogLoader if necessary + // so it is guaranteed that the epoch entries will be correct even when on-disk + // checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End). val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( dir, topicPartition, logDirFailureChannel, config.recordVersion, - s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ") + s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ", + None, + scheduler) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxTransactionTimeoutMs, producerStateManagerConfig, time) val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic) @@ -2095,7 +2101,8 @@ object UnifiedLog extends Logging { } /** - * If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache. + * If the recordVersion is >= RecordVersion.V2, then create a new LeaderEpochFileCache instance + * or update current cache if any with the new checkpoint and return it. * Otherwise, the message format is considered incompatible and the existing LeaderEpoch file * is deleted. * @@ -2104,33 +2111,29 @@ object UnifiedLog extends Logging { * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure * @param recordVersion The record version * @param logPrefix The logging prefix + * @param currentCache The current LeaderEpochFileCache instance (if any) + * @param scheduler The scheduler for executing asynchronous tasks * @return The new LeaderEpochFileCache instance (if created), none otherwise */ def maybeCreateLeaderEpochCache(dir: File, topicPartition: TopicPartition, logDirFailureChannel: LogDirFailureChannel, recordVersion: RecordVersion, - logPrefix: String): Option[LeaderEpochFileCache] = { + logPrefix: String, + currentCache: Option[LeaderEpochFileCache], + scheduler: Scheduler): Option[LeaderEpochFileCache] = { val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) - def newLeaderEpochFileCache(): LeaderEpochFileCache = { - val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) - new LeaderEpochFileCache(topicPartition, checkpointFile) - } - if (recordVersion.precedes(RecordVersion.V2)) { - val currentCache = if (leaderEpochFile.exists()) - Some(newLeaderEpochFileCache()) - else - None - - if (currentCache.exists(_.nonEmpty)) + if (leaderEpochFile.exists()) { warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") - + } Files.deleteIfExists(leaderEpochFile.toPath) None } else { - Some(newLeaderEpochFileCache()) + val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) + currentCache.map(_.withCheckpoint(checkpointFile)) + .orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler))) } } diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index de3283d21fd42..084e46c5ef266 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh def write(offsets: Map[TopicPartition, Long]): Unit = { val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size) offsets.foreach(x => list.add(x)) - checkpoint.write(list, true) + checkpoint.write(list) } def read(): Map[TopicPartition, Long] = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 193b5c15d68e5..6d895562de735 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -57,6 +57,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.server.util.MockScheduler; import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; @@ -189,7 +190,7 @@ public class RemoteLogManagerTest { List epochs = Collections.emptyList(); @Override - public void write(Collection epochs, boolean ignored) { + public void write(Collection epochs) { this.epochs = new ArrayList<>(epochs); } @@ -202,6 +203,8 @@ public List read() { private final UnifiedLog mockLog = mock(UnifiedLog.class); + private final MockScheduler scheduler = new MockScheduler(time); + @BeforeEach void setUp() throws Exception { topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId()); @@ -241,7 +244,7 @@ void tearDown() { @Test void testGetLeaderEpochCheckpoint() { checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); InMemoryLeaderEpochCheckpoint inMemoryCheckpoint = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 0, 300); assertEquals(totalEpochEntries, inMemoryCheckpoint.read()); @@ -259,7 +262,7 @@ void testFindHighestRemoteOffsetOnEmptyRemoteStorage() throws RemoteStorageExce new EpochEntry(1, 500) ); checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp); OffsetAndEpoch offsetAndEpoch = remoteLogManager.findHighestRemoteOffset(tpId, mockLog); @@ -273,7 +276,7 @@ void testFindHighestRemoteOffset() throws RemoteStorageException { new EpochEntry(1, 500) ); checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp); when(remoteLogMetadataManager.highestOffsetForEpoch(eq(tpId), anyInt())).thenAnswer(ans -> { @@ -296,7 +299,7 @@ void testFindHighestRemoteOffsetWithUncleanLeaderElection() throws RemoteStorage new EpochEntry(2, 300) ); checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp); when(remoteLogMetadataManager.highestOffsetForEpoch(eq(tpId), anyInt())).thenAnswer(ans -> { @@ -457,7 +460,7 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset, // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L)); @@ -571,7 +574,7 @@ void testCustomMetadataSizeExceedsLimit() throws Exception { // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L)); @@ -661,7 +664,7 @@ void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exc // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); @@ -778,7 +781,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception { // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); @@ -928,7 +931,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception { // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); @@ -1001,7 +1004,7 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); @@ -1037,7 +1040,7 @@ void testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialize // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); // Throw a retryable exception so indicate that the remote log metadata manager is not initialized yet @@ -1236,7 +1239,7 @@ void testFindOffsetByTimestamp() throws IOException, RemoteStorageException { TreeMap validSegmentEpochs = new TreeMap<>(); validSegmentEpochs.put(targetLeaderEpoch, startOffset); - LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint, scheduler); leaderEpochFileCache.assign(4, 99L); leaderEpochFileCache.assign(5, 99L); leaderEpochFileCache.assign(targetLeaderEpoch, startOffset); @@ -1270,7 +1273,7 @@ void testFindOffsetByTimestampWithInvalidEpochSegments() throws IOException, Rem validSegmentEpochs.put(targetLeaderEpoch - 1, startOffset - 1); // invalid epochs not aligning with leader epoch cache validSegmentEpochs.put(targetLeaderEpoch, startOffset); - LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint, scheduler); leaderEpochFileCache.assign(4, 99L); leaderEpochFileCache.assign(5, 99L); leaderEpochFileCache.assign(targetLeaderEpoch, startOffset); @@ -1300,7 +1303,7 @@ void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, RemoteSt TreeMap validSegmentEpochs = new TreeMap<>(); validSegmentEpochs.put(targetLeaderEpoch, startOffset); - LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint, scheduler); leaderEpochFileCache.assign(4, 99L); leaderEpochFileCache.assign(5, 99L); leaderEpochFileCache.assign(targetLeaderEpoch, startOffset); @@ -1708,7 +1711,7 @@ public void testFindLogStartOffset() throws RemoteStorageException, IOException epochEntries.add(new EpochEntry(2, 550L)); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); long timestamp = time.milliseconds(); @@ -1746,7 +1749,7 @@ public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty epochEntries.add(new EpochEntry(2, 550L)); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(mockLog.localLogStartOffset()).thenReturn(250L); when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) @@ -1771,7 +1774,7 @@ public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, epochEntries.add(new EpochEntry(2, 550L)); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); RemoteLogSegmentMetadata metadata = mock(RemoteLogSegmentMetadata.class); @@ -1814,7 +1817,7 @@ public void testDeletionOnRetentionBreachedSegments(long retentionSize, List epochEntries = Collections.singletonList(epochEntry0); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); @@ -1867,7 +1870,7 @@ public void testRemoteDeleteLagsOnRetentionBreachedSegments(long retentionSize, List epochEntries = Collections.singletonList(epochEntry0); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); @@ -1938,7 +1941,7 @@ public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws Remot .thenAnswer(ans -> metadataList.iterator()); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); Map logProps = new HashMap<>(); @@ -2000,7 +2003,7 @@ public void testFailedDeleteExpiredSegments(long retentionSize, List epochEntries = Collections.singletonList(epochEntry0); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); @@ -2053,7 +2056,7 @@ public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount, new EpochEntry(4, 100L) ); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch; long localLogSegmentsSize = 512L; @@ -2091,7 +2094,7 @@ public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount, new EpochEntry(4, 100L) ); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch; long localLogSegmentsSize = 512L; @@ -2178,7 +2181,7 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { .thenReturn(remoteLogSegmentMetadatas.iterator()); checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); Map logProps = new HashMap<>(); @@ -2255,7 +2258,7 @@ private Map truncateAndGetLeaderEpochs(List entries, Long endOffset) { InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint(); myCheckpoint.write(entries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler); cache.truncateFromStart(startOffset); cache.truncateFromEnd(endOffset); return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 32ddfc6418d4d..2e9bc068978bd 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -299,7 +299,8 @@ class PartitionLockTest extends Logging { val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None) val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, mockTime.scheduler) val maxTransactionTimeout = 5 * 60 * 1000 val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) val producerStateManager = new ProducerStateManager( diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 45ae991744a3f..cbb3f51d9e294 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -433,7 +433,8 @@ class PartitionTest extends AbstractPartitionTest { val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None) val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, time.scheduler) val maxTransactionTimeoutMs = 5 * 60 * 1000 val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true) val producerStateManager = new ProducerStateManager( diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 0c01419f2111d..df80c6bed8ca4 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -107,7 +107,8 @@ class LogCleanerManagerTest extends Logging { val maxTransactionTimeoutMs = 5 * 60 * 1000 val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val segments = new LogSegments(tp) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) val offsets = new LogLoader( tpDir, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 8afed829bd38c..fe07fd79cf2dc 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -91,7 +91,7 @@ class LogCleanerTest extends Logging { val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) @@ -166,7 +166,8 @@ class LogCleanerTest extends Logging { val maxTransactionTimeoutMs = 5 * 60 * 1000 val producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val logSegments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + dir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxTransactionTimeoutMs, producerStateManagerConfig, time) val offsets = new LogLoader( diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 02dbf35e44056..6062d6f6c2ebf 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -153,7 +153,8 @@ class LogLoaderTest { val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L) val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, logDir, this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time) val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time, @@ -366,7 +367,8 @@ class LogLoaderTest { super.add(wrapper) } } - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, mockTime) val logLoader = new LogLoader( @@ -430,7 +432,8 @@ class LogLoaderTest { val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1) val config = new LogConfig(new Properties()) val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, @@ -539,7 +542,8 @@ class LogLoaderTest { val config = new LogConfig(logProps) val logDirFailureChannel = null val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, @@ -593,7 +597,8 @@ class LogLoaderTest { val config = new LogConfig(logProps) val logDirFailureChannel = null val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, @@ -646,7 +651,8 @@ class LogLoaderTest { val config = new LogConfig(logProps) val logDirFailureChannel = null val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, @@ -1788,7 +1794,8 @@ class LogLoaderTest { log.logSegments.forEach(segment => segments.add(segment)) assertEquals(5, segments.firstSegment.get.baseOffset) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index b545157d3c4b4..4b3b5c7de3029 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log._ @@ -405,14 +406,14 @@ class LogSegmentTest { val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs = Seq.empty[EpochEntry] - override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = { + override def write(epochs: util.Collection[EpochEntry]): Unit = { this.epochs = epochs.asScala.toSeq } override def read(): java.util.List[EpochEntry] = this.epochs.asJava } - val cache = new LeaderEpochFileCache(topicPartition, checkpoint) + val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime())) seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, CompressionType.NONE, 0, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index afc1937e45081..055271c44ede9 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2835,7 +2835,8 @@ class ReplicaManagerTest { val maxTransactionTimeoutMs = 30000 val maxProducerIdExpirationMs = 30000 val segments = new LogSegments(tp) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(tp, logDir, maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time) val offsets = new LogLoader( @@ -6414,7 +6415,7 @@ class ReplicaManagerTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val leaderAndIsr = LeaderAndIsr(0, 1, List(0, 1), LeaderRecoveryState.RECOVERED, LeaderAndIsr.InitialPartitionEpoch) - val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), leaderAndIsr) + val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), leaderAndIsr) replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) verifyRLMOnLeadershipChange(Collections.singleton(partition), Collections.emptySet()) diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala index a7e370d7f4091..ddbf58d884e30 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala @@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging { val logDirFailureChannel = new LogDirFailureChannel(10) val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1, OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent) - checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true) + checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L)) assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()) } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 05041f39709f7..5aee7f0bdb6e2 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -20,6 +20,7 @@ package kafka.server.epoch import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} +import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{EpochEntry, LogDirFailureChannel} @@ -27,7 +28,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import java.io.File -import java.util.{Collections, OptionalInt, Optional} +import java.util.{Collections, Optional, OptionalInt} import scala.collection.Seq import scala.jdk.CollectionConverters._ @@ -38,11 +39,11 @@ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs: Seq[EpochEntry] = Seq() - override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq + override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq override def read(): java.util.List[EpochEntry] = this.epochs.asJava } - private val cache = new LeaderEpochFileCache(tp, checkpoint) + private val cache = new LeaderEpochFileCache(tp, checkpoint, new MockTime().scheduler) @Test def testPreviousEpoch(): Unit = { @@ -245,12 +246,12 @@ class LeaderEpochFileCacheTest { val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath), new LogDirFailureChannel(1)) //Given - val cache = new LeaderEpochFileCache(tp, checkpoint) + val cache = new LeaderEpochFileCache(tp, checkpoint, new MockTime().scheduler) cache.assign(2, 6) //When val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath), new LogDirFailureChannel(1)) - val cache2 = new LeaderEpochFileCache(tp, checkpoint2) + val cache2 = new LeaderEpochFileCache(tp, checkpoint2, new MockTime().scheduler) //Then assertEquals(1, cache2.epochEntries.size) diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index d25fdb1b4e9b1..6280318af5d60 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -139,7 +139,8 @@ class SchedulerTest { val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) val logDirFailureChannel = new LogDirFailureChannel(10) val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "") + val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( + logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime) val offsets = new LogLoader( diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java index 9c115881328a0..6efbaa136e0e9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java @@ -72,7 +72,7 @@ public CheckpointFile(File file, tempPath = Paths.get(absolutePath + ".tmp"); } - public void write(Collection entries, boolean sync) throws IOException { + public void write(Collection entries) throws IOException { synchronized (lock) { // write to temp file and then swap with the existing file try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); @@ -80,12 +80,10 @@ public void write(Collection entries, boolean sync) throws IOException { CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); checkpointWriteBuffer.write(entries); writer.flush(); - if (sync) { - fileOutputStream.getFD().sync(); - } + fileOutputStream.getFD().sync(); } - Utils.atomicMoveWithFallback(tempPath, absolutePath, sync); + Utils.atomicMoveWithFallback(tempPath, absolutePath); } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java index 35abfb5a984d2..f780ced9b0477 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java @@ -41,9 +41,9 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E checkpointFile = new CheckpointFile<>(file, version, formatter); } - public void write(Collection entries, boolean sync) { + public void write(Collection entries) { try { - checkpointFile.write(entries, sync); + checkpointFile.write(entries); } catch (IOException e) { String msg = "Error while writing to checkpoint file " + file.getAbsolutePath(); logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java index 499c19fb78b5b..472441be3872a 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java @@ -42,7 +42,7 @@ public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { private List epochs = Collections.emptyList(); - public void write(Collection epochs, boolean ignored) { + public void write(Collection epochs) { this.epochs = new ArrayList<>(epochs); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java index 28ffae03df0e1..e66e82a0f64f7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java @@ -22,13 +22,7 @@ import java.util.List; public interface LeaderEpochCheckpoint { - // in file-backed checkpoint implementation, the content should be - // synced to the device if `sync` is true - void write(Collection epochs, boolean sync); - - default void write(Collection epochs) { - write(epochs, true); - } + void write(Collection epochs); List read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index 3472182aeea1e..d387352df040f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -53,11 +53,7 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh } public void write(Collection epochs) { - write(epochs, true); - } - - public void write(Collection epochs, boolean sync) { - checkpoint.write(epochs, sync); + checkpoint.write(epochs); } public List read() { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 03df6cc0dceb9..49a5d062d3f36 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; import org.slf4j.Logger; @@ -42,10 +43,15 @@ *

* Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. + *

+ * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flushes the epoch-entry changes to checkpoint asynchronously. + * Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating + * this class from checkpoint (which might contain stale epoch entries right after instantiation). */ public class LeaderEpochFileCache { private final TopicPartition topicPartition; private final LeaderEpochCheckpoint checkpoint; + private final Scheduler scheduler; private final Logger log; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -55,16 +61,40 @@ public class LeaderEpochFileCache { /** * @param topicPartition the associated topic partition * @param checkpoint the checkpoint file + * @param scheduler the scheduler to use for async I/O operations */ @SuppressWarnings("this-escape") - public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { + public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint, Scheduler scheduler) { this.checkpoint = checkpoint; this.topicPartition = topicPartition; + this.scheduler = scheduler; LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); log = logContext.logger(LeaderEpochFileCache.class); checkpoint.read().forEach(this::assign); } + /** + * Instantiates a new LeaderEpochFileCache with replacing checkpoint with given one + * without restoring the cache from the checkpoint, with retaining the current epoch entries. + * @param epochEntries the current epoch entries + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + * @param scheduler the scheduler to use for async I/O operations + */ + private LeaderEpochFileCache(List epochEntries, + TopicPartition topicPartition, + LeaderEpochCheckpoint checkpoint, + Scheduler scheduler) { + this.checkpoint = checkpoint; + this.topicPartition = topicPartition; + this.scheduler = scheduler; + LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); + log = logContext.logger(LeaderEpochFileCache.class); + for (EpochEntry entry : epochEntries) { + epochs.put(entry.epoch, entry); + } + } + /** * Assigns the supplied Leader Epoch to the supplied Offset * Once the epoch is assigned it cannot be reassigned @@ -73,7 +103,7 @@ public void assign(int epoch, long startOffset) { EpochEntry entry = new EpochEntry(epoch, startOffset); if (assign(entry)) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); - writeToFile(true); + writeToFile(); } } @@ -83,7 +113,7 @@ public void assign(List entries) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); } }); - if (!entries.isEmpty()) writeToFile(true); + if (!entries.isEmpty()) writeToFile(); } private boolean isUpdateNeeded(EpochEntry entry) { @@ -305,6 +335,8 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. + *

+ * Checkpoint-flushing is done asynchronously. */ public void truncateFromEnd(long endOffset) { lock.writeLock().lock(); @@ -313,14 +345,14 @@ public void truncateFromEnd(long endOffset) { if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - // We intentionally don't force flushing change to the device here because: + // We flush the change to the device in the background because: // - To avoid fsync latency // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called by ReplicaFetcher threads, which could block replica fetching // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by // another truncateFromEnd call on log loading procedure so it won't be a problem - writeToFile(false); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFile); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -334,6 +366,8 @@ public void truncateFromEnd(long endOffset) { * be offset, then clears any previous epoch entries. *

* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6. + *

+ * Checkpoint-flushing is done asynchronously. * * @param startOffset the offset to clear up to */ @@ -347,14 +381,14 @@ public void truncateFromStart(long startOffset) { EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); - // We intentionally don't force flushing change to the device here because: + // We flush the change to the device in the background because: // - To avoid fsync latency // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called as part of deleteRecords with holding UnifiedLog#lock. // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by // another truncateFromStart call on log loading procedure so it won't be a problem - writeToFile(false); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFile); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } @@ -390,7 +424,27 @@ public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint leaderEpochCheckpoint) lock.readLock().lock(); try { leaderEpochCheckpoint.write(epochEntries()); - return new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint); + // We instantiate LeaderEpochFileCache after writing leaderEpochCheckpoint, + // hence it is guaranteed that the new cache is consistent with the latest epoch entries. + return new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint, scheduler); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Returns a new LeaderEpochFileCache which contains same + * epoch entries with replacing backing checkpoint + * @param leaderEpochCheckpoint the new checkpoint + * @return a new LeaderEpochFileCache instance + */ + public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) { + lock.readLock().lock(); + try { + return new LeaderEpochFileCache(epochEntries(), + topicPartition, + leaderEpochCheckpoint, + scheduler); } finally { lock.readLock().unlock(); } @@ -403,7 +457,7 @@ public void clearAndFlush() { lock.writeLock().lock(); try { epochs.clear(); - writeToFile(true); + writeToFile(); } finally { lock.writeLock().unlock(); } @@ -440,10 +494,10 @@ public NavigableMap epochWithOffsets() { } } - private void writeToFile(boolean sync) { + private void writeToFile() { lock.readLock().lock(); try { - checkpoint.write(epochs.values(), sync); + checkpoint.write(epochs.values()); } finally { lock.readLock().unlock(); } From 7906024a3b8aca1e7071148bcc315f059a01dceb Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Sun, 19 May 2024 23:27:17 +0900 Subject: [PATCH 02/11] add test --- .../server/epoch/LeaderEpochFileCacheTest.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 5aee7f0bdb6e2..925325d2c6efe 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -37,13 +37,14 @@ import scala.jdk.CollectionConverters._ */ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) + val mockTime = new MockTime() private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs: Seq[EpochEntry] = Seq() override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq override def read(): java.util.List[EpochEntry] = this.epochs.asJava } - private val cache = new LeaderEpochFileCache(tp, checkpoint, new MockTime().scheduler) + private val cache = new LeaderEpochFileCache(tp, checkpoint, mockTime.scheduler) @Test def testPreviousEpoch(): Unit = { @@ -659,4 +660,15 @@ class LeaderEpochFileCacheTest { assertEquals(OptionalInt.empty(), cache.epochForOffset(5)) } + @Test + def shouldWriteCheckpointOnTruncation(): Unit = { + cache.assign(2, 6) + cache.assign(3, 8) + cache.assign(4, 11) + + cache.truncateFromEnd(11) + cache.truncateFromStart(8) + + assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read()) + } } From 79c26e98bdc6e49e5e406cfdf4ea2b8703866834 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 23 May 2024 08:55:20 +0900 Subject: [PATCH 03/11] address comments --- .../src/main/scala/kafka/log/UnifiedLog.scala | 4 +-- .../internals/epoch/LeaderEpochFileCache.java | 29 ++++++++++--------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 21c0ee6c2a5c1..d0f09770e0a3b 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -2101,8 +2101,8 @@ object UnifiedLog extends Logging { } /** - * If the recordVersion is >= RecordVersion.V2, then create a new LeaderEpochFileCache instance - * or update current cache if any with the new checkpoint and return it. + * If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance. + * Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty. * Otherwise, the message format is considered incompatible and the existing LeaderEpoch file * is deleted. * diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 49a5d062d3f36..efdd0f381c0cb 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -44,7 +44,7 @@ * Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. *

- * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flushes the epoch-entry changes to checkpoint asynchronously. + * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously. * Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating * this class from checkpoint (which might contain stale epoch entries right after instantiation). */ @@ -74,8 +74,8 @@ public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint } /** - * Instantiates a new LeaderEpochFileCache with replacing checkpoint with given one - * without restoring the cache from the checkpoint, with retaining the current epoch entries. + * Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file. + * The provided epoch entries are expected to no less fresher than the checkpoint file. * @param epochEntries the current epoch entries * @param topicPartition the associated topic partition * @param checkpoint the checkpoint file @@ -350,8 +350,9 @@ public void truncateFromEnd(long endOffset) { // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called by ReplicaFetcher threads, which could block replica fetching // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. - // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by - // another truncateFromEnd call on log loading procedure so it won't be a problem + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure, so it won't be a problem scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFile); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); @@ -386,8 +387,9 @@ public void truncateFromStart(long startOffset) { // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives // * This method is called as part of deleteRecords with holding UnifiedLog#lock. // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust - // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by - // another truncateFromStart call on log loading procedure so it won't be a problem + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromStart call on log loading procedure, so it won't be a problem scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFile); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); @@ -434,8 +436,8 @@ public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint leaderEpochCheckpoint) /** * Returns a new LeaderEpochFileCache which contains same - * epoch entries with replacing backing checkpoint - * @param leaderEpochCheckpoint the new checkpoint + * epoch entries with replacing backing checkpoint file. + * @param leaderEpochCheckpoint the new checkpoint file * @return a new LeaderEpochFileCache instance */ public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) { @@ -496,10 +498,9 @@ public NavigableMap epochWithOffsets() { private void writeToFile() { lock.readLock().lock(); - try { - checkpoint.write(epochs.values()); - } finally { - lock.readLock().unlock(); - } + List entries = new ArrayList<>(epochs.values()); + lock.readLock().unlock(); + + checkpoint.write(entries); } } From 0937e5a969f885429ed39699ac7222cd302435fb Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 23 May 2024 12:34:15 +0900 Subject: [PATCH 04/11] handle concurrent topic deletion --- ...tCheckpointFileWithFailureHandlerTest.scala | 16 ++++++++++++++-- .../CheckpointFileWithFailureHandler.java | 18 ++++++++++++++++++ .../checkpoint/LeaderEpochCheckpoint.java | 4 ++++ .../checkpoint/LeaderEpochCheckpointFile.java | 10 ++++++++++ .../internals/epoch/LeaderEpochFileCache.java | 15 ++++++++++++--- 5 files changed, 58 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala index ddbf58d884e30..7808cedb075ee 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala @@ -19,12 +19,13 @@ package kafka.server.checkpoints import kafka.utils.{Logging, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.KafkaStorageException -import org.apache.kafka.storage.internals.checkpoint.CheckpointFileWithFailureHandler -import org.apache.kafka.storage.internals.log.LogDirFailureChannel +import org.apache.kafka.storage.internals.checkpoint.{CheckpointFileWithFailureHandler, LeaderEpochCheckpointFile} +import org.apache.kafka.storage.internals.log.{EpochEntry, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.Mockito +import java.io.File import java.util.Collections import scala.collection.Map @@ -133,4 +134,15 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging { assertThrows(classOf[IllegalArgumentException], () => lazyCheckpoints.fetch("/invalid/kafka-logs", new TopicPartition("foo", 0))) } + @Test + def testWriteIfDirExistsShouldNotThrowWhenDirNotExists(): Unit = { + val dir = TestUtils.tempDir() + val file = dir.toPath.resolve("test-checkpoint").toFile + val logDirFailureChannel = new LogDirFailureChannel(10) + val checkpointFile = new CheckpointFileWithFailureHandler(file, 0, + LeaderEpochCheckpointFile.FORMATTER, logDirFailureChannel, file.getParent) + + dir.renameTo(new File(dir.getAbsolutePath + "-renamed")) + checkpointFile.writeIfDirExists(Collections.singletonList(new EpochEntry(1, 42))) + } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java index f780ced9b0477..64a593e483807 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java @@ -19,13 +19,19 @@ import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.server.common.CheckpointFile; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.List; public class CheckpointFileWithFailureHandler { + private static final Logger log = LoggerFactory.getLogger(CheckpointFileWithFailureHandler.class); + public final File file; private final LogDirFailureChannel logDirFailureChannel; @@ -51,6 +57,18 @@ public void write(Collection entries) { } } + public void writeIfDirExists(Collection entries) { + try { + checkpointFile.write(entries); + } catch (FileNotFoundException|NoSuchFileException e) { + log.warn("Failed to write to checkpoint file {}", file.getAbsolutePath(), e); + } catch (IOException e) { + String msg = "Error while writing to checkpoint file " + file.getAbsolutePath(); + logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e); + throw new KafkaStorageException(msg, e); + } + } + public List read() { try { return checkpointFile.read(); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java index e66e82a0f64f7..29e5c75b97717 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java @@ -24,5 +24,9 @@ public interface LeaderEpochCheckpoint { void write(Collection epochs); + default void writeForTruncation(Collection epochs) { + write(epochs); + } + List read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index d387352df040f..6c6b2de2780a5 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -52,10 +52,20 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh checkpoint = new CheckpointFileWithFailureHandler<>(file, CURRENT_VERSION, FORMATTER, logDirFailureChannel, file.getParentFile().getParent()); } + @Override public void write(Collection epochs) { checkpoint.write(epochs); } + @Override + public void writeForTruncation(Collection epochs) { + // Writing epoch entries after truncation is done asynchronously for performance reasons. + // This could cause NoSuchFileException when the directory is renamed concurrently for topic deletion, + // so we use writeIfDirExists here. + checkpoint.writeIfDirExists(epochs); + } + + @Override public List read() { return checkpoint.read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index efdd0f381c0cb..9daa0b0f122ca 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -353,7 +353,7 @@ public void truncateFromEnd(long endOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromEnd call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFile); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -390,7 +390,7 @@ public void truncateFromStart(long startOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromStart call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFile); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } @@ -497,10 +497,19 @@ public NavigableMap epochWithOffsets() { } private void writeToFile() { + lock.readLock().lock(); + try { + checkpoint.write(epochs.values()); + } finally { + lock.readLock().unlock(); + } + } + + private void writeToFileForTruncation() { lock.readLock().lock(); List entries = new ArrayList<>(epochs.values()); lock.readLock().unlock(); - checkpoint.write(entries); + checkpoint.writeForTruncation(entries); } } From 7513f76a241f2133283c9c2fe52675953882c5d4 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 23 May 2024 15:37:19 +0900 Subject: [PATCH 05/11] address checkstyle issues --- .../checkpoint/CheckpointFileWithFailureHandler.java | 2 +- .../storage/internals/epoch/LeaderEpochFileCache.java | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java index 64a593e483807..e5c24f0c44708 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java @@ -60,7 +60,7 @@ public void write(Collection entries) { public void writeIfDirExists(Collection entries) { try { checkpointFile.write(entries); - } catch (FileNotFoundException|NoSuchFileException e) { + } catch (FileNotFoundException | NoSuchFileException e) { log.warn("Failed to write to checkpoint file {}", file.getAbsolutePath(), e); } catch (IOException e) { String msg = "Error while writing to checkpoint file " + file.getAbsolutePath(); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 9daa0b0f122ca..8137e019ca380 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -506,10 +506,13 @@ private void writeToFile() { } private void writeToFileForTruncation() { + List entries; lock.readLock().lock(); - List entries = new ArrayList<>(epochs.values()); - lock.readLock().unlock(); - + try { + entries = new ArrayList<>(epochs.values()); + } finally { + lock.readLock().unlock(); + } checkpoint.writeForTruncation(entries); } } From c1def6ab90e57a74c1d6bb37aa079f4e6e89a831 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Fri, 24 May 2024 07:46:58 +0900 Subject: [PATCH 06/11] address feedback --- .../internals/checkpoint/CheckpointFileWithFailureHandler.java | 2 +- .../kafka/storage/internals/epoch/LeaderEpochFileCache.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java index e5c24f0c44708..79963d79d2360 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java @@ -61,7 +61,7 @@ public void writeIfDirExists(Collection entries) { try { checkpointFile.write(entries); } catch (FileNotFoundException | NoSuchFileException e) { - log.warn("Failed to write to checkpoint file {}", file.getAbsolutePath(), e); + log.warn("Failed to write to checkpoint file {}. This is ok if the topic/partition is being deleted", file.getAbsolutePath(), e); } catch (IOException e) { String msg = "Error while writing to checkpoint file " + file.getAbsolutePath(); logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 8137e019ca380..d5a677c00ad70 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -75,7 +75,7 @@ public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint /** * Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file. - * The provided epoch entries are expected to no less fresher than the checkpoint file. + * The provided epoch entries are expected to no less fresh than the checkpoint file. * @param epochEntries the current epoch entries * @param topicPartition the associated topic partition * @param checkpoint the checkpoint file From 482c80c2742662664600a2fe86830a9f2b3aefe2 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Fri, 24 May 2024 08:14:58 +0900 Subject: [PATCH 07/11] address feedback --- .../kafka/storage/internals/epoch/LeaderEpochFileCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index d5a677c00ad70..c0ad752355e19 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -75,7 +75,7 @@ public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint /** * Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file. - * The provided epoch entries are expected to no less fresh than the checkpoint file. + * The provided epoch entries are expected to be no less fresh than the checkpoint file. * @param epochEntries the current epoch entries * @param topicPartition the associated topic partition * @param checkpoint the checkpoint file From 4f6332d1a50b363224be5910a322265e01dd1fdd Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Sat, 25 May 2024 11:40:17 +0900 Subject: [PATCH 08/11] RLM needs synchronous flush --- .../kafka/log/remote/RemoteLogManager.java | 4 +- core/src/main/scala/kafka/log/LogLoader.scala | 4 +- .../src/main/scala/kafka/log/UnifiedLog.scala | 30 +++++++++++-- .../log/remote/RemoteLogManagerTest.java | 4 +- .../scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../epoch/LeaderEpochFileCacheTest.scala | 38 ++++++++--------- .../internals/epoch/LeaderEpochFileCache.java | 42 ++++++++----------- 7 files changed, 70 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6555b7c0cda1a..6e74f713c8a9a 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -575,9 +575,9 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star if (log.leaderEpochCache().isDefined()) { LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint); if (startOffset >= 0) { - cache.truncateFromStart(startOffset); + cache.truncateFromStart(startOffset, true); } - cache.truncateFromEnd(endOffset); + cache.truncateFromEnd(endOffset, true); } return checkpoint; } diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index b0f1fdd0e1ca2..79cb958a0d788 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -173,14 +173,14 @@ class LogLoader( } } - leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset)) + leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset, false)) val newLogStartOffset = if (isRemoteLogEnabled) { logStartOffsetCheckpoint } else { math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset) } // The earliest leader epoch may not be flushed during a hard failure. Recover it here. - leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint)) + leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint, false)) // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here // from scratch. diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 8302123fd1fb5..60cc2eaba6d18 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1016,7 +1016,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, updatedLogStartOffset = true updateLogStartOffset(newLogStartOffset) info(s"Incremented log start offset to $newLogStartOffset due to $reason") - leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) + // We flush the change to the device in the background because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called as part of deleteRecords with holding UnifiedLog#lock. + // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromStart call on log loading procedure, so it won't be a problem + leaderEpochCache.foreach(_.truncateFromStart(logStartOffset, false)) producerStateManager.onLogStartOffsetIncremented(newLogStartOffset) maybeIncrementFirstUnstableOffset() } @@ -1807,7 +1815,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, // and inserted the first start offset entry, but then failed to append any entries // before another leader was elected. lock synchronized { - leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset)) + // We flush the change to the device in the background because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called by ReplicaFetcher threads, which could block replica fetching + // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure, so it won't be a problem + leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset, false)) } false @@ -1820,7 +1836,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else { val deletedSegments = localLog.truncateTo(targetOffset) deleteProducerSnapshots(deletedSegments, asyncDelete = true) - leaderEpochCache.foreach(_.truncateFromEnd(targetOffset)) + // We flush the change to the device in the background because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called by ReplicaFetcher threads, which could block replica fetching + // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure, so it won't be a problem + leaderEpochCache.foreach(_.truncateFromEnd(targetOffset, false)) logStartOffset = math.min(targetOffset, logStartOffset) rebuildProducerState(targetOffset, producerStateManager) if (highWatermark >= localLog.logEndOffset) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 01afc818821b7..e7ef344a45341 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -2308,8 +2308,8 @@ private Map truncateAndGetLeaderEpochs(List entries, InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint(); myCheckpoint.write(entries); LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler); - cache.truncateFromStart(startOffset); - cache.truncateFromEnd(endOffset); + cache.truncateFromStart(startOffset, true); + cache.truncateFromEnd(endOffset, true); return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index f8cc1f01efce9..805fc671bf144 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -1393,7 +1393,7 @@ class LogLoaderTest { assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) // deliberately remove some of the epoch entries - leaderEpochCache.truncateFromEnd(2) + leaderEpochCache.truncateFromEnd(2, false) assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) log.close() diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 925325d2c6efe..c6b5f0c7798a3 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -59,7 +59,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch) - cache.truncateFromEnd(18) + cache.truncateFromEnd(18, false) assertEquals(OptionalInt.of(2), cache.previousEpoch) } @@ -389,7 +389,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When clear latest on epoch boundary - cache.truncateFromEnd(8) + cache.truncateFromEnd(8, false) //Then should remove two latest epochs (remove is inclusive) assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries) @@ -403,7 +403,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset ON epoch boundary - cache.truncateFromStart(8) + cache.truncateFromStart(8, false) //Then should preserve (3, 8) assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -417,7 +417,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromStart(9) + cache.truncateFromStart(9, false) //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -431,7 +431,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset before first epoch offset - cache.truncateFromStart(1) + cache.truncateFromStart(1, false) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -445,7 +445,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on earliest epoch boundary - cache.truncateFromStart(6) + cache.truncateFromStart(6, false) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -459,7 +459,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When - cache.truncateFromStart(11) + cache.truncateFromStart(11, false) //Then retain the last assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries) @@ -473,7 +473,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When we clear from a position between offset 8 & offset 11 - cache.truncateFromStart(9) + cache.truncateFromStart(9, false) //Then we should update the middle epoch entry's offset assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -487,7 +487,7 @@ class LeaderEpochFileCacheTest { cache.assign(2, 10) //When we clear from a position between offset 0 & offset 7 - cache.truncateFromStart(5) + cache.truncateFromStart(5, false) //Then we should keep epoch 0 but update the offset appropriately assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)), @@ -502,7 +502,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset beyond last epoch - cache.truncateFromStart(15) + cache.truncateFromStart(15, false) //Then update the last assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries) @@ -516,7 +516,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromEnd( 9) + cache.truncateFromEnd(9, false) //Then should keep the preceding epochs assertEquals(OptionalInt.of(3), cache.latestEpoch) @@ -545,7 +545,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET) + cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET, false) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -559,7 +559,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET) + cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET, false) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -580,13 +580,13 @@ class LeaderEpochFileCacheTest { @Test def shouldClearEarliestOnEmptyCache(): Unit = { //Then - cache.truncateFromStart(7) + cache.truncateFromStart(7, false) } @Test def shouldClearLatestOnEmptyCache(): Unit = { //Then - cache.truncateFromEnd(7) + cache.truncateFromEnd(7, false) } @Test @@ -602,7 +602,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch(10)) - cache.truncateFromEnd(18) + cache.truncateFromEnd(18, false) assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt)) } @@ -619,7 +619,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(Optional.of(new EpochEntry(4, 15)), cache.previousEntry(10)) - cache.truncateFromEnd(18) + cache.truncateFromEnd(18, false) assertEquals(Optional.of(new EpochEntry(2, 10)), cache.previousEntry(cache.latestEpoch.getAsInt)) } @@ -666,8 +666,8 @@ class LeaderEpochFileCacheTest { cache.assign(3, 8) cache.assign(4, 11) - cache.truncateFromEnd(11) - cache.truncateFromStart(8) + cache.truncateFromEnd(11, false) + cache.truncateFromStart(8, false) assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read()) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index c0ad752355e19..d84f191db2723 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -44,7 +44,7 @@ * Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. *

- * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously. + * Note that {@link #truncateFromStart},{@link #truncateFromEnd} may flush the epoch-entry changes to checkpoint asynchronously. * Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating * this class from checkpoint (which might contain stale epoch entries right after instantiation). */ @@ -335,25 +335,22 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. - *

- * Checkpoint-flushing is done asynchronously. + * + * @param endOffset the offset to clear up to + * @param flushSync if true, the method will block until the changes are flushed to file */ - public void truncateFromEnd(long endOffset) { + public void truncateFromEnd(long endOffset, boolean flushSync) { lock.writeLock().lock(); try { Optional epochEntry = latestEntry(); if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called by ReplicaFetcher threads, which could block replica fetching - // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromEnd call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + if (flushSync) { + writeToFileForTruncation(); + } else { + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + } log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -367,12 +364,11 @@ public void truncateFromEnd(long endOffset) { * be offset, then clears any previous epoch entries. *

* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6. - *

- * Checkpoint-flushing is done asynchronously. * * @param startOffset the offset to clear up to + * @param flushSync if true, the method will block until the changes are flushed to file */ - public void truncateFromStart(long startOffset) { + public void truncateFromStart(long startOffset, boolean flushSync) { lock.writeLock().lock(); try { List removedEntries = removeFromStart(entry -> entry.startOffset <= startOffset); @@ -382,15 +378,11 @@ public void truncateFromStart(long startOffset) { EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called as part of deleteRecords with holding UnifiedLog#lock. - // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromStart call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + if (flushSync) { + writeToFileForTruncation(); + } else { + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + } log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } From 22be0ba9a8cfc934291f75175d4f7717f661583e Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Mon, 3 Jun 2024 13:48:57 +0900 Subject: [PATCH 09/11] fix indirect truncation for RLM --- .../kafka/log/remote/RemoteLogManager.java | 46 ++++-- core/src/main/scala/kafka/log/LogLoader.scala | 4 +- .../src/main/scala/kafka/log/UnifiedLog.scala | 30 +--- .../log/remote/RemoteLogManagerTest.java | 61 ++++---- .../scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 13 +- .../InMemoryLeaderEpochCheckpointTest.scala | 58 -------- .../epoch/LeaderEpochFileCacheTest.scala | 47 +++--- .../InMemoryLeaderEpochCheckpoint.java | 63 -------- .../checkpoint/LeaderEpochCheckpoint.java | 32 ----- .../checkpoint/LeaderEpochCheckpointFile.java | 5 +- .../internals/epoch/LeaderEpochFileCache.java | 134 +++++++++++------- 12 files changed, 181 insertions(+), 314 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala delete mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java delete mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6e74f713c8a9a..7ba602f9c9b64 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; @@ -57,7 +58,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.metrics.KafkaMetricsGroup; -import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.EpochEntry; @@ -79,12 +80,16 @@ import scala.Option; import scala.collection.JavaConverters; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStreamWriter; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -561,25 +566,23 @@ public boolean isCancelled() { } /** - * Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset. + *

+ * Visible for testing. * * @param log The actual log from where to take the leader-epoch checkpoint - * @param startOffset The start offset of the checkpoint file (exclusive in the truncation). + * @param startOffset The start offset of the epoch entries (exclusive). * If start offset is 6, then it will retain an entry at offset 6. - * @param endOffset The end offset of the checkpoint file (inclusive in the truncation) + * @param endOffset The end offset of the epoch entries (inclusive) * If end offset is 100, then it will remove the entries greater than or equal to 100. - * @return the truncated leader epoch checkpoint + * @return the leader epoch entries */ - InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long startOffset, long endOffset) { - InMemoryLeaderEpochCheckpoint checkpoint = new InMemoryLeaderEpochCheckpoint(); + List getLeaderEpochEntries(UnifiedLog log, long startOffset, long endOffset) { if (log.leaderEpochCache().isDefined()) { - LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint); - if (startOffset >= 0) { - cache.truncateFromStart(startOffset, true); - } - cache.truncateFromEnd(endOffset, true); + return log.leaderEpochCache().get().epochEntriesInRange(startOffset, endOffset); + } else { + return Collections.emptyList(); } - return checkpoint; } class RLMTask extends CancellableRunnable { @@ -736,7 +739,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment long endOffset = nextSegmentBaseOffset - 1; File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null); - List epochEntries = getLeaderEpochCheckpoint(log, segment.baseOffset(), nextSegmentBaseOffset).read(); + List epochEntries = getLeaderEpochEntries(log, segment.baseOffset(), nextSegmentBaseOffset); Map segmentLeaderEpochs = new HashMap<>(epochEntries.size()); epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset)); @@ -746,7 +749,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get(); - ByteBuffer leaderEpochsIndex = getLeaderEpochCheckpoint(log, -1, nextSegmentBaseOffset).readAsByteBuffer(); + ByteBuffer leaderEpochsIndex = epochEntriesAsByteBuffer(getLeaderEpochEntries(log, -1, nextSegmentBaseOffset)); LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()), toPathIfExists(segment.timeIndex().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())), producerStateSnapshotFile.toPath(), leaderEpochsIndex); @@ -1692,6 +1695,19 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo LOGGER.info("Shutting down of thread pool {} is completed", poolName); } + //Visible for testing + static ByteBuffer epochEntriesAsByteBuffer(List epochEntries) throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) { + CheckpointFile.CheckpointWriteBuffer writeBuffer = + new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER); + writeBuffer.write(epochEntries); + writer.flush(); + } + + return ByteBuffer.wrap(stream.toByteArray()); + } + private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) { String topic = topicIdPartition.topic(); if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) { diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index 79cb958a0d788..b0f1fdd0e1ca2 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -173,14 +173,14 @@ class LogLoader( } } - leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset, false)) + leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset)) val newLogStartOffset = if (isRemoteLogEnabled) { logStartOffsetCheckpoint } else { math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset) } // The earliest leader epoch may not be flushed during a hard failure. Recover it here. - leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint, false)) + leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint)) // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here // from scratch. diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 60cc2eaba6d18..8302123fd1fb5 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1016,15 +1016,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, updatedLogStartOffset = true updateLogStartOffset(newLogStartOffset) info(s"Incremented log start offset to $newLogStartOffset due to $reason") - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called as part of deleteRecords with holding UnifiedLog#lock. - // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromStart call on log loading procedure, so it won't be a problem - leaderEpochCache.foreach(_.truncateFromStart(logStartOffset, false)) + leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) producerStateManager.onLogStartOffsetIncremented(newLogStartOffset) maybeIncrementFirstUnstableOffset() } @@ -1815,15 +1807,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // and inserted the first start offset entry, but then failed to append any entries // before another leader was elected. lock synchronized { - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called by ReplicaFetcher threads, which could block replica fetching - // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromEnd call on log loading procedure, so it won't be a problem - leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset, false)) + leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset)) } false @@ -1836,15 +1820,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else { val deletedSegments = localLog.truncateTo(targetOffset) deleteProducerSnapshots(deletedSegments, asyncDelete = true) - // We flush the change to the device in the background because: - // - To avoid fsync latency - // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives - // * This method is called by ReplicaFetcher threads, which could block replica fetching - // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. - // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. - // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by - // another truncateFromEnd call on log loading procedure, so it won't be a problem - leaderEpochCache.foreach(_.truncateFromEnd(targetOffset, false)) + leaderEpochCache.foreach(_.truncateFromEnd(targetOffset)) logStartOffset = math.min(targetOffset, logStartOffset) rebuildProducerState(targetOffset, producerStateManager) if (highWatermark >= localLog.logEndOffset) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index e7ef344a45341..7fda9c6e37cb0 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -58,14 +58,14 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.util.MockScheduler; -import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint; -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchIsolation; import org.apache.kafka.storage.internals.log.LazyIndex; import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.internals.log.LogFileUtils; import org.apache.kafka.storage.internals.log.LogSegment; import org.apache.kafka.storage.internals.log.OffsetIndex; @@ -87,16 +87,19 @@ import scala.Option; import scala.collection.JavaConverters; +import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.InputStream; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -188,19 +191,7 @@ public class RemoteLogManagerTest { private final EpochEntry epochEntry1 = new EpochEntry(1, 100); private final EpochEntry epochEntry2 = new EpochEntry(2, 200); private final List totalEpochEntries = Arrays.asList(epochEntry0, epochEntry1, epochEntry2); - private final LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint() { - List epochs = Collections.emptyList(); - - @Override - public void write(Collection epochs) { - this.epochs = new ArrayList<>(epochs); - } - - @Override - public List read() { - return epochs; - } - }; + private LeaderEpochCheckpointFile checkpoint; private final AtomicLong currentLogStartOffset = new AtomicLong(0L); private final UnifiedLog mockLog = mock(UnifiedLog.class); @@ -209,6 +200,7 @@ public List read() { @BeforeEach void setUp() throws Exception { + checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)); topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId()); topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId()); Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); @@ -248,11 +240,9 @@ void testGetLeaderEpochCheckpoint() { checkpoint.write(totalEpochEntries); LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); - InMemoryLeaderEpochCheckpoint inMemoryCheckpoint = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 0, 300); - assertEquals(totalEpochEntries, inMemoryCheckpoint.read()); + assertEquals(totalEpochEntries, remoteLogManager.getLeaderEpochEntries(mockLog, 0, 300)); - InMemoryLeaderEpochCheckpoint inMemoryCheckpoint2 = remoteLogManager.getLeaderEpochCheckpoint(mockLog, 100, 200); - List epochEntries = inMemoryCheckpoint2.read(); + List epochEntries = remoteLogManager.getLeaderEpochEntries(mockLog, 100, 200); assertEquals(1, epochEntries.size()); assertEquals(epochEntry1, epochEntries.get(0)); } @@ -1124,9 +1114,7 @@ private void verifyLogSegmentData(LogSegmentData logSegmentData, assertEquals(tempFile.getAbsolutePath(), logSegmentData.logSegment().toAbsolutePath().toString()); assertEquals(mockProducerSnapshotIndex.getAbsolutePath(), logSegmentData.producerSnapshotIndex().toAbsolutePath().toString()); - InMemoryLeaderEpochCheckpoint inMemoryLeaderEpochCheckpoint = new InMemoryLeaderEpochCheckpoint(); - inMemoryLeaderEpochCheckpoint.write(expectedLeaderEpoch); - assertEquals(inMemoryLeaderEpochCheckpoint.readAsByteBuffer(), logSegmentData.leaderEpochIndex()); + assertEquals(RemoteLogManager.epochEntriesAsByteBuffer(expectedLeaderEpoch), logSegmentData.leaderEpochIndex()); } @Test @@ -2305,11 +2293,17 @@ private List listRemoteLogSegmentMetadataByTime(TopicI private Map truncateAndGetLeaderEpochs(List entries, Long startOffset, Long endOffset) { - InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint(); + LeaderEpochCheckpointFile myCheckpoint; + try { + myCheckpoint = new LeaderEpochCheckpointFile( + TestUtils.tempFile(), new LogDirFailureChannel(1)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } myCheckpoint.write(entries); LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler); - cache.truncateFromStart(startOffset, true); - cache.truncateFromEnd(endOffset, true); + cache.truncateFromStart(startOffset); + cache.truncateFromEnd(endOffset); return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } @@ -2534,6 +2528,21 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + @Test + public void testEpochEntriesAsByteBuffer() throws Exception { + int expectedEpoch = 0; + long expectedStartOffset = 1L; + int expectedVersion = 0; + List epochs = Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset)); + ByteBuffer buffer = RemoteLogManager.epochEntriesAsByteBuffer(epochs); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8)); + + assertEquals(String.valueOf(expectedVersion), bufferedReader.readLine()); + assertEquals(String.valueOf(epochs.size()), bufferedReader.readLine()); + assertEquals(expectedEpoch + " " + expectedStartOffset, bufferedReader.readLine()); + } + + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class); diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 805fc671bf144..f8cc1f01efce9 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -1393,7 +1393,7 @@ class LogLoaderTest { assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) // deliberately remove some of the epoch entries - leaderEpochCache.truncateFromEnd(2, false) + leaderEpochCache.truncateFromEnd(2) assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) log.close() diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index bac6a0808bc06..decbc09382b1a 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{MockTime, Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.util.MockScheduler -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log._ import org.junit.jupiter.api.Assertions._ @@ -34,7 +34,6 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource import java.io.{File, RandomAccessFile} -import java.util import java.util.{Optional, OptionalLong} import scala.collection._ import scala.jdk.CollectionConverters._ @@ -404,15 +403,7 @@ class LogSegmentTest { def testRecoveryRebuildsEpochCache(): Unit = { val seg = createSegment(0) - val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { - private var epochs = Seq.empty[EpochEntry] - - override def write(epochs: util.Collection[EpochEntry]): Unit = { - this.epochs = epochs.asScala.toSeq - } - - override def read(): java.util.List[EpochEntry] = this.epochs.asJava - } + val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)) val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime())) seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0, diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala deleted file mode 100644 index 3af126f5c5529..0000000000000 --- a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server.checkpoints - -import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint -import org.apache.kafka.storage.internals.log.EpochEntry -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader} -import java.nio.charset.StandardCharsets - -class InMemoryLeaderEpochCheckpointTest { - - @Test - def shouldAppendNewEntry(): Unit = { - val checkpoint = new InMemoryLeaderEpochCheckpoint() - val epochs = java.util.Arrays.asList(new EpochEntry(0, 1L), new EpochEntry(1, 2L), new EpochEntry(2, 3L)) - checkpoint.write(epochs) - assertEquals(epochs, checkpoint.read()) - - val epochs2 = java.util.Arrays.asList(new EpochEntry(3, 4L), new EpochEntry(4, 5L)) - checkpoint.write(epochs2) - - assertEquals(epochs2, checkpoint.read()) - } - - @Test - def testReadAsByteBuffer(): Unit = { - val checkpoint = new InMemoryLeaderEpochCheckpoint() - val expectedEpoch = 0 - val expectedStartOffset = 1L - val expectedVersion = 0 - val epochs = java.util.Arrays.asList(new EpochEntry(expectedEpoch, expectedStartOffset)) - checkpoint.write(epochs) - assertEquals(epochs, checkpoint.read()) - val buffer = checkpoint.readAsByteBuffer() - - val bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array()), StandardCharsets.UTF_8)) - assertEquals(expectedVersion.toString, bufferedReader.readLine()) - assertEquals(epochs.size().toString, bufferedReader.readLine()) - assertEquals(s"$expectedEpoch $expectedStartOffset", bufferedReader.readLine()) - } -} diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index c6b5f0c7798a3..b9e7542e4cc5b 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -21,7 +21,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{EpochEntry, LogDirFailureChannel} import org.junit.jupiter.api.Assertions._ @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Test import java.io.File import java.util.{Collections, Optional, OptionalInt} -import scala.collection.Seq import scala.jdk.CollectionConverters._ /** @@ -38,11 +37,7 @@ import scala.jdk.CollectionConverters._ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) val mockTime = new MockTime() - private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { - private var epochs: Seq[EpochEntry] = Seq() - override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq - override def read(): java.util.List[EpochEntry] = this.epochs.asJava - } + private val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)) private val cache = new LeaderEpochFileCache(tp, checkpoint, mockTime.scheduler) @@ -59,7 +54,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch) - cache.truncateFromEnd(18, false) + cache.truncateFromEnd(18) assertEquals(OptionalInt.of(2), cache.previousEpoch) } @@ -389,7 +384,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When clear latest on epoch boundary - cache.truncateFromEnd(8, false) + cache.truncateFromEnd(8) //Then should remove two latest epochs (remove is inclusive) assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries) @@ -403,7 +398,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset ON epoch boundary - cache.truncateFromStart(8, false) + cache.truncateFromStart(8) //Then should preserve (3, 8) assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -417,7 +412,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromStart(9, false) + cache.truncateFromStart(9) //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -431,7 +426,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset before first epoch offset - cache.truncateFromStart(1, false) + cache.truncateFromStart(1) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -445,7 +440,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on earliest epoch boundary - cache.truncateFromStart(6, false) + cache.truncateFromStart(6) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -459,7 +454,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When - cache.truncateFromStart(11, false) + cache.truncateFromStart(11) //Then retain the last assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries) @@ -473,7 +468,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When we clear from a position between offset 8 & offset 11 - cache.truncateFromStart(9, false) + cache.truncateFromStart(9) //Then we should update the middle epoch entry's offset assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -487,7 +482,7 @@ class LeaderEpochFileCacheTest { cache.assign(2, 10) //When we clear from a position between offset 0 & offset 7 - cache.truncateFromStart(5, false) + cache.truncateFromStart(5) //Then we should keep epoch 0 but update the offset appropriately assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)), @@ -502,7 +497,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset beyond last epoch - cache.truncateFromStart(15, false) + cache.truncateFromStart(15) //Then update the last assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries) @@ -516,7 +511,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromEnd(9, false) + cache.truncateFromEnd( 9) //Then should keep the preceding epochs assertEquals(OptionalInt.of(3), cache.latestEpoch) @@ -545,7 +540,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET, false) + cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -559,7 +554,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET, false) + cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -580,13 +575,13 @@ class LeaderEpochFileCacheTest { @Test def shouldClearEarliestOnEmptyCache(): Unit = { //Then - cache.truncateFromStart(7, false) + cache.truncateFromStart(7) } @Test def shouldClearLatestOnEmptyCache(): Unit = { //Then - cache.truncateFromEnd(7, false) + cache.truncateFromEnd(7) } @Test @@ -602,7 +597,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch(10)) - cache.truncateFromEnd(18, false) + cache.truncateFromEnd(18) assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt)) } @@ -619,7 +614,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(Optional.of(new EpochEntry(4, 15)), cache.previousEntry(10)) - cache.truncateFromEnd(18, false) + cache.truncateFromEnd(18) assertEquals(Optional.of(new EpochEntry(2, 10)), cache.previousEntry(cache.latestEpoch.getAsInt)) } @@ -666,8 +661,8 @@ class LeaderEpochFileCacheTest { cache.assign(3, 8) cache.assign(4, 11) - cache.truncateFromEnd(11, false) - cache.truncateFromStart(8, false) + cache.truncateFromEnd(11) + cache.truncateFromStart(8) assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read()) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java deleted file mode 100644 index 472441be3872a..0000000000000 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.storage.internals.checkpoint; - -import org.apache.kafka.server.common.CheckpointFile; -import org.apache.kafka.storage.internals.log.EpochEntry; - -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory - * - * The motivation for this class is to allow remote log manager to create the RemoteLogSegmentMetadata(RLSM) - * with the correct leader epoch info for a specific segment. To do that, we need to rely on the LeaderEpochCheckpointCache - * to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache - * (and write to checkpoint file in the end). So, we introduce this InMemoryLeaderEpochCheckpoint to feed into LeaderEpochCheckpointCache, - * and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system. - */ -public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { - private List epochs = Collections.emptyList(); - - public void write(Collection epochs) { - this.epochs = new ArrayList<>(epochs); - } - - public List read() { - return Collections.unmodifiableList(epochs); - } - - public ByteBuffer readAsByteBuffer() throws IOException { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8))) { - CheckpointFile.CheckpointWriteBuffer writeBuffer = new CheckpointFile.CheckpointWriteBuffer<>(writer, 0, LeaderEpochCheckpointFile.FORMATTER); - writeBuffer.write(epochs); - writer.flush(); - } - - return ByteBuffer.wrap(stream.toByteArray()); - } -} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java deleted file mode 100644 index 29e5c75b97717..0000000000000 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.storage.internals.checkpoint; - -import org.apache.kafka.storage.internals.log.EpochEntry; - -import java.util.Collection; -import java.util.List; - -public interface LeaderEpochCheckpoint { - void write(Collection epochs); - - default void writeForTruncation(Collection epochs) { - write(epochs); - } - - List read(); -} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index 6c6b2de2780a5..392a36533400f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -38,7 +38,7 @@ * 1 2 * -----checkpoint file end---------- */ -public class LeaderEpochCheckpointFile implements LeaderEpochCheckpoint { +public class LeaderEpochCheckpointFile { public static final Formatter FORMATTER = new Formatter(); @@ -52,12 +52,10 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh checkpoint = new CheckpointFileWithFailureHandler<>(file, CURRENT_VERSION, FORMATTER, logDirFailureChannel, file.getParentFile().getParent()); } - @Override public void write(Collection epochs) { checkpoint.write(epochs); } - @Override public void writeForTruncation(Collection epochs) { // Writing epoch entries after truncation is done asynchronously for performance reasons. // This could cause NoSuchFileException when the directory is renamed concurrently for topic deletion, @@ -65,7 +63,6 @@ public void writeForTruncation(Collection epochs) { checkpoint.writeIfDirExists(epochs); } - @Override public List read() { return checkpoint.read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index d84f191db2723..d7d5fd61f2ec7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -19,15 +19,17 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.log.EpochEntry; -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; import org.slf4j.Logger; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Optional; import java.util.OptionalInt; @@ -44,13 +46,13 @@ * Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. *

- * Note that {@link #truncateFromStart},{@link #truncateFromEnd} may flush the epoch-entry changes to checkpoint asynchronously. + * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously. * Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating * this class from checkpoint (which might contain stale epoch entries right after instantiation). */ public class LeaderEpochFileCache { private final TopicPartition topicPartition; - private final LeaderEpochCheckpoint checkpoint; + private final LeaderEpochCheckpointFile checkpoint; private final Scheduler scheduler; private final Logger log; @@ -64,7 +66,7 @@ public class LeaderEpochFileCache { * @param scheduler the scheduler to use for async I/O operations */ @SuppressWarnings("this-escape") - public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint, Scheduler scheduler) { + public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpointFile checkpoint, Scheduler scheduler) { this.checkpoint = checkpoint; this.topicPartition = topicPartition; this.scheduler = scheduler; @@ -83,7 +85,7 @@ public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint */ private LeaderEpochFileCache(List epochEntries, TopicPartition topicPartition, - LeaderEpochCheckpoint checkpoint, + LeaderEpochCheckpointFile checkpoint, Scheduler scheduler) { this.checkpoint = checkpoint; this.topicPartition = topicPartition; @@ -147,7 +149,9 @@ private boolean assign(EpochEntry entry) { * Remove any entries which violate monotonicity prior to appending a new entry */ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { - List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + List removedEpochs = removeFromEnd( + epochs, + entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); if (removedEpochs.size() > 1 || (!removedEpochs.isEmpty() && removedEpochs.get(0).startOffset != newEntry.startOffset)) { @@ -158,15 +162,17 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { } } - private List removeFromEnd(Predicate predicate) { + private static List removeFromEnd( + TreeMap epochs, Predicate predicate) { return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), predicate); } - private List removeFromStart(Predicate predicate) { + private static List removeFromStart( + TreeMap epochs, Predicate predicate) { return removeWhileMatching(epochs.entrySet().iterator(), predicate); } - private List removeWhileMatching(Iterator> iterator, Predicate predicate) { + private static List removeWhileMatching(Iterator> iterator, Predicate predicate) { ArrayList removedEpochs = new ArrayList<>(); while (iterator.hasNext()) { @@ -335,22 +341,23 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. - * - * @param endOffset the offset to clear up to - * @param flushSync if true, the method will block until the changes are flushed to file + *

+ * Checkpoint-flushing is done asynchronously. */ - public void truncateFromEnd(long endOffset, boolean flushSync) { + public void truncateFromEnd(long endOffset) { lock.writeLock().lock(); try { - Optional epochEntry = latestEntry(); - if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { - List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - - if (flushSync) { - writeToFileForTruncation(); - } else { - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); - } + List removedEntries = truncateFromEnd(epochs, endOffset); + if (!removedEntries.isEmpty()) { + // We flush the change to the device in the background because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called by ReplicaFetcher threads, which could block replica fetching + // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure, so it won't be a problem + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -364,26 +371,27 @@ public void truncateFromEnd(long endOffset, boolean flushSync) { * be offset, then clears any previous epoch entries. *

* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6. + *

+ * Checkpoint-flushing is done asynchronously. * * @param startOffset the offset to clear up to - * @param flushSync if true, the method will block until the changes are flushed to file */ - public void truncateFromStart(long startOffset, boolean flushSync) { + public void truncateFromStart(long startOffset) { lock.writeLock().lock(); try { - List removedEntries = removeFromStart(entry -> entry.startOffset <= startOffset); - + List removedEntries = truncateFromStart(epochs, startOffset); if (!removedEntries.isEmpty()) { - EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1); - EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); - epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); - - if (flushSync) { - writeToFileForTruncation(); - } else { - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); - } - + // We flush the change to the device in the background because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called as part of deleteRecords with holding UnifiedLog#lock. + // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust + // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. + // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by + // another truncateFromStart call on log loading procedure, so it won't be a problem + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + + EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } } finally { @@ -391,6 +399,26 @@ public void truncateFromStart(long startOffset, boolean flushSync) { } } + private static List truncateFromStart(TreeMap epochs, long startOffset) { + List removedEntries = removeFromStart(epochs, entry -> entry.startOffset <= startOffset); + + if (!removedEntries.isEmpty()) { + EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1); + EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); + epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); + } + + return removedEntries; + } + + private static List truncateFromEnd(TreeMap epochs, long endOffset) { + Optional epochEntry = Optional.ofNullable(epochs.lastEntry()).map(Entry::getValue); + if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { + return removeFromEnd(epochs, x -> x.startOffset >= endOffset); + } + return Collections.emptyList(); + } + public OptionalInt epochForOffset(long offset) { lock.readLock().lock(); try { @@ -414,25 +442,13 @@ public OptionalInt epochForOffset(long offset) { } } - public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint leaderEpochCheckpoint) { - lock.readLock().lock(); - try { - leaderEpochCheckpoint.write(epochEntries()); - // We instantiate LeaderEpochFileCache after writing leaderEpochCheckpoint, - // hence it is guaranteed that the new cache is consistent with the latest epoch entries. - return new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint, scheduler); - } finally { - lock.readLock().unlock(); - } - } - /** * Returns a new LeaderEpochFileCache which contains same * epoch entries with replacing backing checkpoint file. * @param leaderEpochCheckpoint the new checkpoint file * @return a new LeaderEpochFileCache instance */ - public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) { + public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile leaderEpochCheckpoint) { lock.readLock().lock(); try { return new LeaderEpochFileCache(epochEntries(), @@ -444,6 +460,26 @@ public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpoint leaderEpochChec } } + /** + * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset + * @param startOffset The start offset of the epoch entries (exclusive). + * @param endOffset The end offset of the epoch entries (inclusive) + * @return the leader epoch entries + */ + public List epochEntriesInRange(long startOffset, long endOffset) { + lock.readLock().lock(); + try { + TreeMap epochsCopy = new TreeMap<>(this.epochs); + if (startOffset >= 0) { + truncateFromStart(epochsCopy, startOffset); + } + truncateFromEnd(epochsCopy, endOffset); + return new ArrayList<>(epochsCopy.values()); + } finally { + lock.readLock().unlock(); + } + } + /** * Delete all entries. */ From c24f94936d4370b2a3f3bfad42c56198208079b4 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Mon, 3 Jun 2024 22:17:43 +0900 Subject: [PATCH 10/11] fix conflict --- .../test/java/kafka/log/remote/RemoteLogManagerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 612fd2d416f4e..22a9181ccbabc 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -796,7 +796,7 @@ void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws // leader epoch preparation checkpoint.write(totalEpochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); + LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler); when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())) .thenReturn(Optional.of(0L)) @@ -2675,7 +2675,7 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } - + @Test public void testCopyQuotaManagerConfig() { Properties defaultProps = new Properties(); @@ -2695,7 +2695,7 @@ public void testCopyQuotaManagerConfig() { assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); } - + @Test public void testFetchQuotaManagerConfig() { Properties defaultProps = new Properties(); From 2a4ef5cd9aa8f666cf04e130e9a34b364932e6e1 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Tue, 4 Jun 2024 10:02:40 +0900 Subject: [PATCH 11/11] address feedbacks --- .../kafka/log/remote/RemoteLogManager.java | 4 +- core/src/main/scala/kafka/log/LogLoader.scala | 4 +- .../src/main/scala/kafka/log/UnifiedLog.scala | 6 +-- .../log/remote/RemoteLogManagerTest.java | 4 +- .../scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../epoch/LeaderEpochFileCacheTest.scala | 38 +++++++++---------- .../internals/epoch/LeaderEpochFileCache.java | 31 ++++++--------- 7 files changed, 40 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index ef59c23d76ebf..07c7550748f04 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -618,9 +618,9 @@ public boolean isCancelled() { * Visible for testing. * * @param log The actual log from where to take the leader-epoch checkpoint - * @param startOffset The start offset of the epoch entries (exclusive). + * @param startOffset The start offset of the epoch entries (inclusive). * If start offset is 6, then it will retain an entry at offset 6. - * @param endOffset The end offset of the epoch entries (inclusive) + * @param endOffset The end offset of the epoch entries (exclusive) * If end offset is 100, then it will remove the entries greater than or equal to 100. * @return the leader epoch entries */ diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala index b0f1fdd0e1ca2..b3b0ec2c63362 100644 --- a/core/src/main/scala/kafka/log/LogLoader.scala +++ b/core/src/main/scala/kafka/log/LogLoader.scala @@ -173,14 +173,14 @@ class LogLoader( } } - leaderEpochCache.ifPresent(_.truncateFromEnd(nextOffset)) + leaderEpochCache.ifPresent(_.truncateFromEndAsyncFlush(nextOffset)) val newLogStartOffset = if (isRemoteLogEnabled) { logStartOffsetCheckpoint } else { math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset) } // The earliest leader epoch may not be flushed during a hard failure. Recover it here. - leaderEpochCache.ifPresent(_.truncateFromStart(logStartOffsetCheckpoint)) + leaderEpochCache.ifPresent(_.truncateFromStartAsyncFlush(logStartOffsetCheckpoint)) // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here // from scratch. diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 60adc0be86616..bef18806b0dc8 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1016,7 +1016,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, updatedLogStartOffset = true updateLogStartOffset(newLogStartOffset) info(s"Incremented log start offset to $newLogStartOffset due to $reason") - leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) + leaderEpochCache.foreach(_.truncateFromStartAsyncFlush(logStartOffset)) producerStateManager.onLogStartOffsetIncremented(newLogStartOffset) maybeIncrementFirstUnstableOffset() } @@ -1814,7 +1814,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // and inserted the first start offset entry, but then failed to append any entries // before another leader was elected. lock synchronized { - leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset)) + leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(logEndOffset)) } false @@ -1827,7 +1827,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else { val deletedSegments = localLog.truncateTo(targetOffset) deleteProducerSnapshots(deletedSegments, asyncDelete = true) - leaderEpochCache.foreach(_.truncateFromEnd(targetOffset)) + leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset)) logStartOffset = math.min(targetOffset, logStartOffset) rebuildProducerState(targetOffset, producerStateManager) if (highWatermark >= localLog.logEndOffset) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 22a9181ccbabc..19fa4a8a4431d 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -2447,8 +2447,8 @@ private Map truncateAndGetLeaderEpochs(List entries, } myCheckpoint.write(entries); LeaderEpochFileCache cache = new LeaderEpochFileCache(null, myCheckpoint, scheduler); - cache.truncateFromStart(startOffset); - cache.truncateFromEnd(endOffset); + cache.truncateFromStartAsyncFlush(startOffset); + cache.truncateFromEndAsyncFlush(endOffset); return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index c332aaa4c7e58..7838da5417364 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -1393,7 +1393,7 @@ class LogLoaderTest { assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) // deliberately remove some of the epoch entries - leaderEpochCache.truncateFromEnd(2) + leaderEpochCache.truncateFromEndAsyncFlush(2) assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries) log.close() diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index b9e7542e4cc5b..6f6d0bdbda58d 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -54,7 +54,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch) - cache.truncateFromEnd(18) + cache.truncateFromEndAsyncFlush(18) assertEquals(OptionalInt.of(2), cache.previousEpoch) } @@ -384,7 +384,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When clear latest on epoch boundary - cache.truncateFromEnd(8) + cache.truncateFromEndAsyncFlush(8) //Then should remove two latest epochs (remove is inclusive) assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries) @@ -398,7 +398,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset ON epoch boundary - cache.truncateFromStart(8) + cache.truncateFromStartAsyncFlush(8) //Then should preserve (3, 8) assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -412,7 +412,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromStart(9) + cache.truncateFromStartAsyncFlush(9) //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -426,7 +426,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset before first epoch offset - cache.truncateFromStart(1) + cache.truncateFromStartAsyncFlush(1) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -440,7 +440,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on earliest epoch boundary - cache.truncateFromStart(6) + cache.truncateFromStartAsyncFlush(6) //Then nothing should change assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries) @@ -454,7 +454,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When - cache.truncateFromStart(11) + cache.truncateFromStartAsyncFlush(11) //Then retain the last assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries) @@ -468,7 +468,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When we clear from a position between offset 8 & offset 11 - cache.truncateFromStart(9) + cache.truncateFromStartAsyncFlush(9) //Then we should update the middle epoch entry's offset assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries) @@ -482,7 +482,7 @@ class LeaderEpochFileCacheTest { cache.assign(2, 10) //When we clear from a position between offset 0 & offset 7 - cache.truncateFromStart(5) + cache.truncateFromStartAsyncFlush(5) //Then we should keep epoch 0 but update the offset appropriately assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)), @@ -497,7 +497,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset beyond last epoch - cache.truncateFromStart(15) + cache.truncateFromStartAsyncFlush(15) //Then update the last assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries) @@ -511,7 +511,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset BETWEEN epoch boundaries - cache.truncateFromEnd( 9) + cache.truncateFromEndAsyncFlush( 9) //Then should keep the preceding epochs assertEquals(OptionalInt.of(3), cache.latestEpoch) @@ -540,7 +540,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET) + cache.truncateFromStartAsyncFlush(UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -554,7 +554,7 @@ class LeaderEpochFileCacheTest { cache.assign(4, 11) //When reset to offset on epoch boundary - cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET) + cache.truncateFromEndAsyncFlush(UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -575,13 +575,13 @@ class LeaderEpochFileCacheTest { @Test def shouldClearEarliestOnEmptyCache(): Unit = { //Then - cache.truncateFromStart(7) + cache.truncateFromStartAsyncFlush(7) } @Test def shouldClearLatestOnEmptyCache(): Unit = { //Then - cache.truncateFromEnd(7) + cache.truncateFromEndAsyncFlush(7) } @Test @@ -597,7 +597,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(OptionalInt.of(4), cache.previousEpoch(10)) - cache.truncateFromEnd(18) + cache.truncateFromEndAsyncFlush(18) assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt)) } @@ -614,7 +614,7 @@ class LeaderEpochFileCacheTest { cache.assign(10, 20) assertEquals(Optional.of(new EpochEntry(4, 15)), cache.previousEntry(10)) - cache.truncateFromEnd(18) + cache.truncateFromEndAsyncFlush(18) assertEquals(Optional.of(new EpochEntry(2, 10)), cache.previousEntry(cache.latestEpoch.getAsInt)) } @@ -661,8 +661,8 @@ class LeaderEpochFileCacheTest { cache.assign(3, 8) cache.assign(4, 11) - cache.truncateFromEnd(11) - cache.truncateFromStart(8) + cache.truncateFromEndAsyncFlush(11) + cache.truncateFromStartAsyncFlush(8) assertEquals(List(new EpochEntry(3, 8)).asJava, checkpoint.read()) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index d7d5fd61f2ec7..7b78a70993d93 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -46,7 +46,7 @@ * Leader Epoch = epoch assigned to each leader by the controller. * Offset = offset of the first message in each epoch. *

- * Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously. + * Note that {@link #truncateFromStartAsyncFlush},{@link #truncateFromEndAsyncFlush} flush the epoch-entry changes to checkpoint asynchronously. * Hence, it is instantiater's responsibility to ensure restoring the cache to the correct state after instantiating * this class from checkpoint (which might contain stale epoch entries right after instantiation). */ @@ -149,8 +149,8 @@ private boolean assign(EpochEntry entry) { * Remove any entries which violate monotonicity prior to appending a new entry */ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { - List removedEpochs = removeFromEnd( - epochs, + List removedEpochs = removeWhileMatching( + epochs.descendingMap().entrySet().iterator(), entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); if (removedEpochs.size() > 1 || (!removedEpochs.isEmpty() && removedEpochs.get(0).startOffset != newEntry.startOffset)) { @@ -162,16 +162,6 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { } } - private static List removeFromEnd( - TreeMap epochs, Predicate predicate) { - return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), predicate); - } - - private static List removeFromStart( - TreeMap epochs, Predicate predicate) { - return removeWhileMatching(epochs.entrySet().iterator(), predicate); - } - private static List removeWhileMatching(Iterator> iterator, Predicate predicate) { ArrayList removedEpochs = new ArrayList<>(); @@ -344,7 +334,7 @@ public Map.Entry endOffsetFor(int requestedEpoch, long logEndOffs *

* Checkpoint-flushing is done asynchronously. */ - public void truncateFromEnd(long endOffset) { + public void truncateFromEndAsyncFlush(long endOffset) { lock.writeLock().lock(); try { List removedEntries = truncateFromEnd(epochs, endOffset); @@ -376,7 +366,7 @@ public void truncateFromEnd(long endOffset) { * * @param startOffset the offset to clear up to */ - public void truncateFromStart(long startOffset) { + public void truncateFromStartAsyncFlush(long startOffset) { lock.writeLock().lock(); try { List removedEntries = truncateFromStart(epochs, startOffset); @@ -400,7 +390,8 @@ public void truncateFromStart(long startOffset) { } private static List truncateFromStart(TreeMap epochs, long startOffset) { - List removedEntries = removeFromStart(epochs, entry -> entry.startOffset <= startOffset); + List removedEntries = removeWhileMatching( + epochs.entrySet().iterator(), entry -> entry.startOffset <= startOffset); if (!removedEntries.isEmpty()) { EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1); @@ -414,7 +405,7 @@ private static List truncateFromStart(TreeMap e private static List truncateFromEnd(TreeMap epochs, long endOffset) { Optional epochEntry = Optional.ofNullable(epochs.lastEntry()).map(Entry::getValue); if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { - return removeFromEnd(epochs, x -> x.startOffset >= endOffset); + return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), x -> x.startOffset >= endOffset); } return Collections.emptyList(); } @@ -461,9 +452,9 @@ public LeaderEpochFileCache withCheckpoint(LeaderEpochCheckpointFile leaderEpoch } /** - * Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset - * @param startOffset The start offset of the epoch entries (exclusive). - * @param endOffset The end offset of the epoch entries (inclusive) + * Returns the leader epoch entries within the range of the given start and end offset + * @param startOffset The start offset of the epoch entries (inclusive). + * @param endOffset The end offset of the epoch entries (exclusive) * @return the leader epoch entries */ public List epochEntriesInRange(long startOffset, long endOffset) {