Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ public void truncateFromEndAsyncFlush(long endOffset) {
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
List<EpochEntry> entries = new ArrayList<>(epochs.values());
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach introduces a new correctness issue. With this change, it's possible for older epoch entries to overwrite the newer epoch entries in the leader epoch file. Consider the following sequence: we take a snapshot of the epoch entries here; a new epoch entry is added and is flushed to disk; the scheduler then writes the snapshot to disk. This can lead to the case where the leader epoch file doesn't contain all entries up to the recovery point.

Since the issue is only in the test, I am wondering if we could fix the test directly. For example, perhaps we could introduce a NoOpScheduler and use it in the test, since the test doesn't depend on the leader epoch entries to be actually flushed to disk.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the issue is only in the test, I am wondering if we could fix the test directly. For example, perhaps we could introduce a NoOpScheduler and use it in the test, since the test doesn't depend on the leader epoch entries to be actually flushed to disk.

this is another good approach.

This approach introduces a new correctness issue. With this change, it's possible for older epoch entries to overwrite the newer epoch entries in the leader epoch file. Consider the following sequence: we take a snapshot of the epoch entries here; a new epoch entry is added and is flushed to disk; the scheduler then writes the snapshot to disk. This can lead to the case where the leader epoch file doesn't contain all entries up to the recovery point.

Sorry to cause possible correctness issue. @FrankYang0529 and I had discussed the approach offline when I noticed that deadlock, and I suggest to change the production code directly. It seems to me this PR does NOT change the execution order, because the "writeToFileForTruncation" does not hold the single lock to complete the "snapshot" and "flush".

    private void writeToFileForTruncation() {
        // phase 1: create snapshot by holding read lock
        List<EpochEntry> entries;
        lock.readLock().lock();
        try {
            entries = new ArrayList<>(epochs.values());
        } finally {
            lock.readLock().unlock();
        }
        // phase 2: flush by holding write lock
        checkpoint.writeForTruncation(entries);
    }

Hence, the issue you mentioned can happen even though we revert this PR. for example:

  1. writeToFileForTruncation (run by scheduler) take a snapshot of the epoch entries in phase 1 (see comment in above code)
  2. a new epoch entry is added and is flushed to disk
  3. writeToFileForTruncation (run by scheduler) then writes the snapshot to disk in phase 2 (see comment in above code)

In summary: there are two follow-up:

  1. rewrite testLogRecoveryMetrics by NoOpScheduler
  2. add writeToFileForTruncation back except for "snapshot". for example:
    private void writeToFileForTruncation() {
        lock.readLock().lock();
        try {
            checkpoint.writeForTruncation(epochs.values());
        } finally {
            lock.readLock().unlock();
        }
    }

@junrao WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggestion makes sense to me.

Copy link
Copy Markdown
Contributor

@ocadaruma ocadaruma Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Hi, thank you for pointing out the potential race issue exists even on current code.

The follow-up looks good to me.

For follow-up 2 which moves checkpoint-flush to inside the lock, one concern is potential request-handler/replica-fetcher thread blocking due to the fsync latency. (i.e. threads call truncateFromStart/EndAsyncFlush will be blocked meanwhile)
However it might not be the critical performance issue because:

  • These methods are not called frequently (typical call path is truncation during fetch response handling and deleteRecords handling), so it will unlikely be called when writeToFileForTruncation (scheduled by previous method call) is ongoing and causes lock contention.
    • Unless kafka-schedulers are very busy and task execution is delayed

Let me consider if some optimization is possible for this as an another follow-up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Yes, you are right that the overwriting issue was already introduced in #15993. Moving the flush call inside the read lock fixes this issue, but it defeats the original performance optimization in #14242. @ocadaruma : What's your opinion on this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all, thanks for raising the correctness issue. IMO, we can fix data correctness first, and then improve performance if it doesn't break data correctness.

I will rewrite testLogRecoveryMetrics with NoOpScheduler first and see whether need to improve LeaderEpochFileCache performance with its own scheduler. Thank you.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for the explanation. Yes, I agree that the async flush still gives us some perf benefits. As for the fix, the two followups suggested by @chia7712 sound reasonable to me. They probably should be done in the same PR?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They probably should be done in the same PR?

I assumed that it needs more discussion for the changes of production code. For example:

Yeah, could be an issue in some cases (e.g. deleteRecords is called frequently, and/or kafka-schedulers are busy) though.

The two follow-ups are orthogonal now, and hence I prefer to fix them separately to avoid unnecessary block.

BTW, please feel free to leave more comments on the https://issues.apache.org/jira/browse/KAFKA-17167 for the fix.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought the simple fix you suggested is to do the following. This will bring back the deadlock issue in the test, right?

private void writeToFileForTruncation() { 
  lock.readLock().lock(); 
  try { checkpoint.writeForTruncation(epochs.values()); 
  } finally { lock.readLock().unlock(); } 
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will bring back the deadlock issue in the test, right?

yes, it does. However, my point was - if it needs more discussion for @ocadaruma comment: "Yeah, could be an issue in some cases (e.g. deleteRecords is called frequently, and/or kafka-schedulers are busy) though.", we can improve the test before adding writeToFileForTruncation back to production.

At any rate, it seems we all agree to have the simple fix for now, and so I merge KAFKA-17166 and KAFKA-17167


log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
Expand Down Expand Up @@ -380,7 +381,8 @@ public void truncateFromStartAsyncFlush(long startOffset) {
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
List<EpochEntry> entries = new ArrayList<>(epochs.values());
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, () -> checkpoint.writeForTruncation(entries));

EpochEntry updatedFirstEntry = removedEntries.get(removedEntries.size() - 1);
log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
Expand Down Expand Up @@ -524,15 +526,4 @@ private void writeToFile() {
lock.readLock().unlock();
}
}

private void writeToFileForTruncation() {
List<EpochEntry> entries;
lock.readLock().lock();
try {
entries = new ArrayList<>(epochs.values());
} finally {
lock.readLock().unlock();
}
checkpoint.writeForTruncation(entries);
}
}