From d34489ba827634a289a6da8b5ead2b0ddbfab697 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 18 Jul 2024 13:25:45 +0800 Subject: [PATCH] KAFKA-17142: Fix deadlock caused by LogManagerTest#testLogRecoveryMetrics Signed-off-by: PoAn Yang --- .../internals/epoch/LeaderEpochFileCache.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 1304fa311cb6f..28b09953baa2c 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -348,7 +348,8 @@ public void truncateFromEndAsyncFlush(long endOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromEnd call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + List entries = new ArrayList<>(epochs.values()); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries)); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -380,7 +381,8 @@ public void truncateFromStartAsyncFlush(long startOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromStart call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + List entries = new ArrayList<>(epochs.values()); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries)); EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); @@ -524,15 +526,4 @@ private void writeToFile() { lock.readLock().unlock(); } } - - private void writeToFileForTruncation() { - List entries; - lock.readLock().lock(); - try { - entries = new ArrayList<>(epochs.values()); - } finally { - lock.readLock().unlock(); - } - checkpoint.writeForTruncation(entries); - } }