diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index 392a36533400f..33b4a1d5bbdee 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -56,7 +56,7 @@ public void write(Collection epochs) { checkpoint.write(epochs); } - public void writeForTruncation(Collection epochs) { + public void writeIfDirExists(Collection epochs) { // Writing epoch entries after truncation is done asynchronously for performance reasons. // This could cause NoSuchFileException when the directory is renamed concurrently for topic deletion, // so we use writeIfDirExists here. diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 496cde779961e..ece9f4b6468e7 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -348,7 +348,7 @@ public void truncateFromEndAsyncFlush(long endOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromEnd call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeIfDirExists); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -380,7 +380,7 @@ public void truncateFromStartAsyncFlush(long startOffset) { // - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries. // * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by // another truncateFromStart call on log loading procedure, so it won't be a problem - scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation); + scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeIfDirExists); EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); @@ -525,10 +525,13 @@ private void writeToFile() { } } - private void writeToFileForTruncation() { + private void writeIfDirExists() { lock.readLock().lock(); try { - checkpoint.writeForTruncation(epochs.values()); + // If we take a snapshot of the epoch entries here and flush them to disk outside the read lock, + // by the time of flushing, the leader epoch file may already be updated with newer epoch entries. + // Those newer entries will then be overridden with the old snapshot. + checkpoint.writeIfDirExists(epochs.values()); } finally { lock.readLock().unlock(); }