From 184b0318f5c97c9d1b42f3961bbf4f1e22ed470c Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Fri, 18 Aug 2023 18:21:14 +0900 Subject: [PATCH 1/6] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance --- .../org/apache/kafka/common/utils/Utils.java | 11 ++++++ .../src/main/scala/kafka/log/UnifiedLog.scala | 10 ++++-- .../checkpoints/OffsetCheckpointFile.scala | 2 +- .../log/remote/RemoteLogManagerTest.java | 2 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 2 +- ...CheckpointFileWithFailureHandlerTest.scala | 2 +- .../epoch/LeaderEpochFileCacheTest.scala | 2 +- .../kafka/server/common/CheckpointFile.java | 8 +++-- .../storage/CommittedOffsetsFile.java | 4 +-- .../CheckpointFileWithFailureHandler.java | 4 +-- .../InMemoryLeaderEpochCheckpoint.java | 4 +-- .../checkpoint/LeaderEpochCheckpoint.java | 7 +++- .../checkpoint/LeaderEpochCheckpointFile.java | 8 +++-- .../internals/epoch/LeaderEpochFileCache.java | 34 +++++++++++++------ .../internals/log/ProducerStateManager.java | 20 ++++++++--- .../storage/internals/log/SnapshotFile.java | 4 +-- 16 files changed, 89 insertions(+), 35 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index bf9d0d8f16d14..e577e3ceb741b 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1020,6 +1020,17 @@ public static void flushDirIfExists(Path path) throws IOException { } } + /** + * Flushes dirty file quietly, logs warning when exception happens. + */ + public static void flushFileQuietly(Path path, String name) { + try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { + fileChannel.force(true); + } catch (IOException e) { + log.warn("Failed to flush {} at path {}", name, path, e); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 10c86183b3f06..2cdc3c214edfd 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), // we manually override the state offset here prior to taking the snapshot. producerStateManager.updateMapEndOffset(newSegment.baseOffset) - producerStateManager.takeSnapshot() + // We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here + // which could block subsequent produces in the meantime. + // flush is done in the scheduler thread along with segment flushing below + val maybeSnapshot = producerStateManager.takeSnapshot(false) updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment - scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) + scheduler.scheduleOnce("flush-log", () => { + maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, "producer-snapshot")) + flushUptoOffsetExclusive(newSegment.baseOffset) + }) newSegment } diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 084e46c5ef266..de3283d21fd42 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -68,7 +68,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh def write(offsets: Map[TopicPartition, Long]): Unit = { val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size) offsets.foreach(x => list.add(x)) - checkpoint.write(list) + checkpoint.write(list, true) } def read(): Map[TopicPartition, Long] = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 588b31786c2d7..e628dc461f6a6 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -162,7 +162,7 @@ public class RemoteLogManagerTest { List epochs = Collections.emptyList(); @Override - public void write(Collection epochs) { + public void write(Collection epochs, boolean ignored) { this.epochs = new ArrayList<>(epochs); } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 65088b7dae3de..f497139b17c80 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -407,7 +407,7 @@ class LogSegmentTest { val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs = Seq.empty[EpochEntry] - override def write(epochs: util.Collection[EpochEntry]): Unit = { + override def write(epochs: util.Collection[EpochEntry], ignored: Boolean): Unit = { this.epochs = epochs.asScala.toSeq } diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala index ddbf58d884e30..a7e370d7f4091 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala @@ -97,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging { val logDirFailureChannel = new LogDirFailureChannel(10) val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1, OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent) - checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L)) + checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L), true) assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()) } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 3523142f8871a..8e9b18262dcc4 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -38,7 +38,7 @@ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { private var epochs: Seq[EpochEntry] = Seq() - override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq + override def write(epochs: java.util.Collection[EpochEntry], ignored: Boolean): Unit = this.epochs = epochs.asScala.toSeq override def read(): java.util.List[EpochEntry] = this.epochs.asJava } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java index 747a16a6d293d..d84a1e1e5f044 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java @@ -72,7 +72,7 @@ public CheckpointFile(File file, tempPath = Paths.get(absolutePath + ".tmp"); } - public void write(Collection entries) throws IOException { + public void write(Collection entries, boolean sync) throws IOException { synchronized (lock) { // write to temp file and then swap with the existing file try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); @@ -80,10 +80,12 @@ public void write(Collection entries) throws IOException { CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); checkpointWriteBuffer.write(entries); writer.flush(); - fileOutputStream.getFD().sync(); + if (sync) { + fileOutputStream.getFD().sync(); + } } - Utils.atomicMoveWithFallback(tempPath, absolutePath); + Utils.atomicMoveWithFallback(tempPath, absolutePath, sync); } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java index 1eddc0b788b16..a08e0f30507bf 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java @@ -68,7 +68,7 @@ public Optional> fromString(String line) { } public synchronized void writeEntries(Map committedOffsets) throws IOException { - checkpointFile.write(committedOffsets.entrySet()); + checkpointFile.write(committedOffsets.entrySet(), true); } public synchronized Map readEntries() throws IOException { @@ -83,4 +83,4 @@ public synchronized Map readEntries() throws IOException { return partitionToOffsets; } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java index f780ced9b0477..35abfb5a984d2 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CheckpointFileWithFailureHandler.java @@ -41,9 +41,9 @@ public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.E checkpointFile = new CheckpointFile<>(file, version, formatter); } - public void write(Collection entries) { + public void write(Collection entries, boolean sync) { try { - checkpointFile.write(entries); + checkpointFile.write(entries, sync); } catch (IOException e) { String msg = "Error while writing to checkpoint file " + file.getAbsolutePath(); logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java index 3ef30b2502a8b..386712b330fbd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java @@ -42,7 +42,7 @@ public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint { private List epochs = Collections.emptyList(); - public void write(Collection epochs) { + public void write(Collection epochs, boolean ignored) { this.epochs = new ArrayList<>(epochs); } @@ -60,4 +60,4 @@ public ByteBuffer readAsByteBuffer() throws IOException { return ByteBuffer.wrap(stream.toByteArray()); } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java index 8cf5519512622..28ffae03df0e1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpoint.java @@ -22,8 +22,13 @@ import java.util.List; public interface LeaderEpochCheckpoint { + // in file-backed checkpoint implementation, the content should be + // synced to the device if `sync` is true + void write(Collection epochs, boolean sync); - void write(Collection epochs); + default void write(Collection epochs) { + write(epochs, true); + } List read(); } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java index 81527c6377a28..3472182aeea1e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/LeaderEpochCheckpointFile.java @@ -53,7 +53,11 @@ public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureCh } public void write(Collection epochs) { - checkpoint.write(epochs); + write(epochs, true); + } + + public void write(Collection epochs, boolean sync) { + checkpoint.write(epochs, sync); } public List read() { @@ -75,4 +79,4 @@ public Optional fromString(String line) { return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty(); } } -} \ No newline at end of file +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 4cb7744957960..f69bd48da2e16 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -72,7 +72,7 @@ public void assign(int epoch, long startOffset) { EpochEntry entry = new EpochEntry(epoch, startOffset); if (assign(entry)) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); - flush(); + flush(true); } } @@ -82,7 +82,7 @@ public void assign(List entries) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); } }); - if (!entries.isEmpty()) flush(); + if (!entries.isEmpty()) flush(true); } private boolean isUpdateNeeded(EpochEntry entry) { @@ -152,7 +152,7 @@ private List removeWhileMatching(Iterator= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) { List removedEntries = removeFromEnd(x -> x.startOffset >= endOffset); - flush(); + // We intentionally don't force flushing change to the device here because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called by ReplicaFetcher threads, which could block replica fetching + // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. + // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by + // another truncateFromEnd call on log loading procedure so it won't be a problem + flush(false); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -335,7 +342,14 @@ public void truncateFromStart(long startOffset) { EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset); epochs.put(updatedFirstEntry.epoch, updatedFirstEntry); - flush(); + // We intentionally don't force flushing change to the device here because: + // - To avoid fsync latency + // * fsync latency could be huge on a disk glitch, which is not rare in spinning drives + // * This method is called as part of deleteRecords with holding UnifiedLog#lock. + // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust + // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by + // another truncateFromStart call on log loading procedure so it won't be a problem + flush(false); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } @@ -384,7 +398,7 @@ public void clearAndFlush() { lock.writeLock().lock(); try { epochs.clear(); - flush(); + flush(true); } finally { lock.writeLock().unlock(); } @@ -421,16 +435,16 @@ public NavigableMap epochWithOffsets() { } } - private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) { + private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, boolean sync) { lock.readLock().lock(); try { - leaderEpochCheckpoint.write(epochs.values()); + leaderEpochCheckpoint.write(epochs.values(), sync); } finally { lock.readLock().unlock(); } } - private void flush() { - flushTo(this.checkpoint); + private void flush(boolean sync) { + flushTo(this.checkpoint, sync); } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index d3e48ef7057b1..9dab6a98613f8 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -462,14 +462,21 @@ public Optional lastEntry(long producerId) { } /** - * Take a snapshot at the current end offset if one does not already exist. + * Take a snapshot at the current end offset if one does not already exist with syncing the change to the device */ public void takeSnapshot() throws IOException { + takeSnapshot(true); + } + + /** + * Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken. + */ + public Optional takeSnapshot(boolean sync) throws IOException { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset)); long start = time.hiResClockMs(); - writeSnapshot(snapshotFile.file(), producers); + writeSnapshot(snapshotFile.file(), producers, sync); log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset, producers.size(), time.hiResClockMs() - start); @@ -477,7 +484,10 @@ public void takeSnapshot() throws IOException { // Update the last snap offset according to the serialized map lastSnapOffset = lastMapOffset; + + return Optional.of(snapshotFile.file()); } + return Optional.empty(); } /** @@ -684,7 +694,7 @@ public static List readSnapshot(File file) throws IOExceptio } } - private static void writeSnapshot(File file, Map entries) throws IOException { + private static void writeSnapshot(File file, Map entries, boolean sync) throws IOException { Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA); struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION); struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries @@ -716,7 +726,9 @@ private static void writeSnapshot(File file, Map entri try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { fileChannel.write(buffer); - fileChannel.force(true); + if (sync) { + fileChannel.force(true); + } } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java index be496ab299878..7e8677b0238f4 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java @@ -63,7 +63,7 @@ public File file() { public void renameTo(String newSuffix) throws IOException { File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix)); try { - Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath()); + Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), false); } finally { file = renamed; } @@ -76,4 +76,4 @@ public String toString() { ", file=" + file + ')'; } -} \ No newline at end of file +} From b3d53e3b9e65a058d48d3d1fe658cecbef162f3e Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Fri, 3 Nov 2023 16:11:05 +0900 Subject: [PATCH 2/6] address comments --- .../main/java/org/apache/kafka/common/utils/Utils.java | 10 +++++----- core/src/main/scala/kafka/log/UnifiedLog.scala | 6 +++++- .../storage/internals/epoch/LeaderEpochFileCache.java | 5 ----- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index e577e3ceb741b..cbf13841a6d1a 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1021,13 +1021,13 @@ public static void flushDirIfExists(Path path) throws IOException { } /** - * Flushes dirty file quietly, logs warning when exception happens. + * Flushes dirty file with swallowing {@link NoSuchFileException} */ - public static void flushFileQuietly(Path path, String name) { + public static void flushFileIfExists(Path path) throws IOException { try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { fileChannel.force(true); - } catch (IOException e) { - log.warn("Failed to flush {} at path {}", name, path, e); + } catch (NoSuchFileException e) { + log.warn("Failed to flush file {}", path, e); } } @@ -1508,7 +1508,7 @@ public static Iterator covariantCast(Iterator iterator) { * Checks if a string is null, empty or whitespace only. * @param str a string to be checked * @return true if the string is null, empty or whitespace only; otherwise, return false. - */ + */ public static boolean isBlank(String str) { return str == null || str.trim().isEmpty(); } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 2cdc3c214edfd..0a1a7e051cb9a 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1624,7 +1624,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment scheduler.scheduleOnce("flush-log", () => { - maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, "producer-snapshot")) + maybeSnapshot.ifPresent(f => { + maybeHandleIOException(s"Error while deleting producer state snapshot $f for $topicPartition in dir ${dir.getParent}") { + Utils.flushFileIfExists(f.toPath) + } + }) flushUptoOffsetExclusive(newSegment.baseOffset) }) newSegment diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index f69bd48da2e16..7a71121dafb60 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -151,11 +151,6 @@ private List removeWhileMatching(Iterator Date: Sat, 18 Nov 2023 17:46:18 +0900 Subject: [PATCH 3/6] address comments --- core/src/main/scala/kafka/log/UnifiedLog.scala | 5 ++--- .../kafka/storage/internals/log/ProducerStateManager.java | 2 +- .../org/apache/kafka/storage/internals/log/SnapshotFile.java | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 0a1a7e051cb9a..b15d17ed2dcb2 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1624,11 +1624,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment scheduler.scheduleOnce("flush-log", () => { - maybeSnapshot.ifPresent(f => { + maybeSnapshot.ifPresent(f => maybeHandleIOException(s"Error while deleting producer state snapshot $f for $topicPartition in dir ${dir.getParent}") { Utils.flushFileIfExists(f.toPath) - } - }) + }) flushUptoOffsetExclusive(newSegment.baseOffset) }) newSegment diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 9dab6a98613f8..ba8d214ab6209 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -645,7 +645,7 @@ public Optional removeAndMarkSnapshotForDeletion(long snapshotOffs // deletion, so ignoring the exception here just means that the intended operation was // already completed. try { - snapshotFile.renameTo(LogFileUtils.DELETED_FILE_SUFFIX); + snapshotFile.renameToDelete(LogFileUtils.DELETED_FILE_SUFFIX); return Optional.of(snapshotFile); } catch (NoSuchFileException ex) { log.info("Failed to rename producer state snapshot {} with deletion suffix because it was already deleted", snapshotFile.file().getAbsoluteFile()); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java index 7e8677b0238f4..8cdd6a487e773 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java @@ -60,7 +60,7 @@ public File file() { return file; } - public void renameTo(String newSuffix) throws IOException { + public void renameToDelete(String newSuffix) throws IOException { File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix)); try { Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), false); From 165ec5e1f88a0e8a6c1b30833d392dd78d578b02 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Sun, 19 Nov 2023 04:20:57 +0900 Subject: [PATCH 4/6] added a test --- .../src/main/scala/kafka/log/UnifiedLog.scala | 13 ++++++---- .../scala/unit/kafka/log/UnifiedLogTest.scala | 24 ++++++++++++++++--- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index b15d17ed2dcb2..35edbdbde8c5e 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -43,7 +43,7 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams} import java.io.{File, IOException} -import java.nio.file.Files +import java.nio.file.{Files, Path} import java.util import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.{Collections, Optional, OptionalInt, OptionalLong} @@ -1624,10 +1624,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment scheduler.scheduleOnce("flush-log", () => { - maybeSnapshot.ifPresent(f => - maybeHandleIOException(s"Error while deleting producer state snapshot $f for $topicPartition in dir ${dir.getParent}") { - Utils.flushFileIfExists(f.toPath) - }) + maybeSnapshot.ifPresent(f => flushProducerStateSnapshot(f.toPath)) flushUptoOffsetExclusive(newSegment.baseOffset) }) newSegment @@ -1712,6 +1709,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, producerStateManager.mapEndOffset } + private[log] def flushProducerStateSnapshot(snapshot: Path): Unit = { + maybeHandleIOException(s"Error while deleting producer state snapshot $snapshot for $topicPartition in dir ${dir.getParent}") { + Utils.flushFileIfExists(snapshot) + } + } + /** * Truncate this log so that it ends with the greatest offset < targetOffset. * diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 0104c55e4f2a2..a50ea65320a13 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -40,8 +40,8 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEn import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers -import org.mockito.ArgumentMatchers.anyLong -import org.mockito.Mockito.{mock, when} +import org.mockito.ArgumentMatchers.{any, anyLong} +import org.mockito.Mockito.{doThrow, mock, spy, when} import java.io._ import java.nio.ByteBuffer @@ -3581,7 +3581,7 @@ class UnifiedLogTest { val records = TestUtils.singletonRecords(value = s"test$i".getBytes) log.appendAsLeader(records, leaderEpoch = 0) } - + log.updateHighWatermark(90L) log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion) assertEquals(20, log.logStartOffset) @@ -3796,6 +3796,24 @@ class UnifiedLogTest { log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard) } + @Test + def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + val log = spy(createLog(logDir, logConfig)) + + doThrow(new KafkaStorageException("Injected exception")).when(log).flushProducerStateSnapshot(any()) + + log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) + try { + log.roll(Some(1L)) + } catch { + case _: KafkaStorageException => // ignore + } + + // check that the recovery point isn't incremented + assertEquals(0L, log.recoveryPoint) + } + private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, From 8691c8702968afa08528fa54cb989ed22cc72a4f Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Wed, 22 Nov 2023 08:28:31 +0900 Subject: [PATCH 5/6] address feedbacks --- .../internals/epoch/LeaderEpochFileCache.java | 18 +++++++----------- .../internals/log/ProducerStateManager.java | 2 +- .../storage/internals/log/SnapshotFile.java | 4 ++-- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index 7a71121dafb60..adac8350fa22f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -72,7 +72,7 @@ public void assign(int epoch, long startOffset) { EpochEntry entry = new EpochEntry(epoch, startOffset); if (assign(entry)) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); - flush(true); + writeToFile(true); } } @@ -82,7 +82,7 @@ public void assign(List entries) { log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); } }); - if (!entries.isEmpty()) flush(true); + if (!entries.isEmpty()) writeToFile(true); } private boolean isUpdateNeeded(EpochEntry entry) { @@ -310,7 +310,7 @@ public void truncateFromEnd(long endOffset) { // then causing ISR shrink or high produce response time degradation in remote scope on high fsync latency. // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be handled by // another truncateFromEnd call on log loading procedure so it won't be a problem - flush(false); + writeToFile(false); log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size()); } @@ -344,7 +344,7 @@ public void truncateFromStart(long startOffset) { // - Meanwhile all produces against the partition will be blocked, which causes req-handlers to exhaust // - Even when stale epochs remained in LeaderEpoch file due to the unclean shutdown, it will be recovered by // another truncateFromStart call on log loading procedure so it won't be a problem - flush(false); + writeToFile(false); log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size()); } @@ -393,7 +393,7 @@ public void clearAndFlush() { lock.writeLock().lock(); try { epochs.clear(); - flush(true); + writeToFile(true); } finally { lock.writeLock().unlock(); } @@ -430,16 +430,12 @@ public NavigableMap epochWithOffsets() { } } - private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, boolean sync) { + private void writeToFile(boolean sync) { lock.readLock().lock(); try { - leaderEpochCheckpoint.write(epochs.values(), sync); + this.checkpoint.write(epochs.values(), sync); } finally { lock.readLock().unlock(); } } - - private void flush(boolean sync) { - flushTo(this.checkpoint, sync); - } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index ba8d214ab6209..6bcafd2d60763 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -645,7 +645,7 @@ public Optional removeAndMarkSnapshotForDeletion(long snapshotOffs // deletion, so ignoring the exception here just means that the intended operation was // already completed. try { - snapshotFile.renameToDelete(LogFileUtils.DELETED_FILE_SUFFIX); + snapshotFile.renameToDelete(); return Optional.of(snapshotFile); } catch (NoSuchFileException ex) { log.info("Failed to rename producer state snapshot {} with deletion suffix because it was already deleted", snapshotFile.file().getAbsoluteFile()); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java index 8cdd6a487e773..61fdae79950db 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java @@ -60,8 +60,8 @@ public File file() { return file; } - public void renameToDelete(String newSuffix) throws IOException { - File renamed = new File(Utils.replaceSuffix(file.getPath(), "", newSuffix)); + public void renameToDelete() throws IOException { + File renamed = new File(Utils.replaceSuffix(file.getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)); try { Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), false); } finally { From 7e40de471095467783846b4db94ebf2287a98727 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Wed, 22 Nov 2023 09:01:02 +0900 Subject: [PATCH 6/6] address feedbacks --- .../kafka/storage/internals/epoch/LeaderEpochFileCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java index adac8350fa22f..dd08044404de6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java @@ -433,7 +433,7 @@ public NavigableMap epochWithOffsets() { private void writeToFile(boolean sync) { lock.readLock().lock(); try { - this.checkpoint.write(epochs.values(), sync); + checkpoint.write(epochs.values(), sync); } finally { lock.readLock().unlock(); }