Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void write(Collection<EpochEntry> epochs) {
checkpoint.write(epochs);
}

public void writeForTruncation(Collection<EpochEntry> epochs) {
public void writeIfDirExists(Collection<EpochEntry> epochs) {
// Writing epoch entries after truncation is done asynchronously for performance reasons.
// This could cause NoSuchFileException when the directory is renamed concurrently for topic deletion,
// so we use writeIfDirExists here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void truncateFromEndAsyncFlush(long endOffset) {
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeIfDirExists);

log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand Down Expand Up @@ -380,7 +380,7 @@ public void truncateFromStartAsyncFlush(long startOffset) {
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeIfDirExists);

EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1);
log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
Expand Down Expand Up @@ -525,10 +525,13 @@ private void writeToFile() {
}
}

private void writeToFileForTruncation() {
private void writeIfDirExists() {
lock.readLock().lock();
try {
checkpoint.writeForTruncation(epochs.values());
// If we take a snapshot of the epoch entries here and flush them to disk outside the read lock,
// by the time of flushing, the leader epoch file may already be updated with newer epoch entries.
// Those newer entries will then be overridden with the old snapshot.
checkpoint.writeIfDirExists(epochs.values());
} finally {
lock.readLock().unlock();
}
Expand Down