From a7a1e6b40b940ba05b2cc475da439599bda666b4 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Sun, 21 Jul 2024 21:33:21 +0800 Subject: [PATCH] KAFKA-17166: Use NoOpScheduler to rewrite LogManagerTest#testLogRecoveryMetrics Signed-off-by: PoAn Yang --- .../scala/unit/kafka/log/LogManagerTest.scala | 4 ++-- .../internals/epoch/LeaderEpochFileCache.java | 15 +++++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 6e92094007152..c70aed6c9a3d5 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -46,7 +46,7 @@ import java.nio.file.Files import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future} import java.util.{Collections, Optional, OptionalLong, Properties} import org.apache.kafka.server.metrics.KafkaYammerMetrics -import org.apache.kafka.server.util.{KafkaScheduler, MockTime} +import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache} import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler import org.junit.jupiter.api.function.Executable @@ -965,7 +965,7 @@ class LogManagerTest { maxTransactionTimeoutMs = 5 * 60 * 1000, producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000, false), producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, - scheduler = mockTime.scheduler, + scheduler = mock(classOf[Scheduler]), time = mockTime, brokerTopicStats = mockBrokerTopicStats, logDirFailureChannel = mock(classOf[LogDirFailureChannel]), diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 28b09953baa2c..496cde779961e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -348,8 +348,7 @@ public void truncateFromEndAsyncFlush(long endOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromEnd call on log loading procedure, so it won't be a problem - List entries = new ArrayList<>(epochs.values()); - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries)); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -381,8 +380,7 @@ public void truncateFromStartAsyncFlush(long startOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromStart call on log loading procedure, so it won't be a problem - List entries = new ArrayList<>(epochs.values()); - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries)); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); @@ -526,4 +524,13 @@ private void writeToFile() { lock.readLock().unlock(); } } + + private void writeToFileForTruncation() { + lock.readLock().lock(); + try { + checkpoint.writeForTruncation(epochs.values()); + } finally { + lock.readLock().unlock(); + } + } }