From d6402cda03400f11f8ba80f21a8969b56a0c67db Mon Sep 17 00:00:00 2001 From: Ruslan Scherbakov Date: Tue, 30 May 2023 23:51:17 +0700 Subject: [PATCH 1/5] Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version --- .../src/main/scala/kafka/log/UnifiedLog.scala | 2 +- .../internals/log/ProducerStateManager.java | 29 ++++++++++++++++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index f4529516e3502..7c02362b0dddd 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1498,7 +1498,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), // we manually override the state offset here prior to taking the snapshot. producerStateManager.updateMapEndOffset(newSegment.baseOffset) - producerStateManager.takeSnapshot() + producerStateManager.takeSnapshot(scheduler) updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 6212711cb2712..64efc9b0f9ee9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Crc32C; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.util.Scheduler; import org.slf4j.Logger; import java.io.File; @@ -430,11 +431,19 @@ public Optional lastEntry(long producerId) { * Take a snapshot at the current end offset if one does not already exist. */ public void takeSnapshot() throws IOException { + takeSnapshot(null); + } + + /** + * Take a snapshot at the current end offset if one does not already exist. + * Flush the snapshot asynchronously if scheduler != null + */ + public void takeSnapshot(Scheduler scheduler) throws IOException { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset)); long start = time.hiResClockMs(); - writeSnapshot(snapshotFile.file(), producers); + writeSnapshot(snapshotFile.file(), producers, scheduler); log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset, producers.size(), time.hiResClockMs() - start); @@ -649,7 +658,7 @@ public static List readSnapshot(File file) throws IOExceptio } } - private static void writeSnapshot(File file, Map entries) throws IOException { + private static void writeSnapshot(File file, Map entries, Scheduler scheduler) throws IOException { Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA); struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION); struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries @@ -679,9 +688,21 @@ private static void writeSnapshot(File file, Map entri long crc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, buffer.limit() - PRODUCER_ENTRIES_OFFSET); ByteUtils.writeUnsignedInt(buffer, CRC_OFFSET, crc); - try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { - fileChannel.write(buffer); + FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); + fileChannel.write(buffer); + if (scheduler != null) { + scheduler.scheduleOnce("flush-snapshot", () -> closeSnapshotFile(fileChannel)); + } else { + closeSnapshotFile(fileChannel); + } + } + + private static void closeSnapshotFile(FileChannel fileChannel) { + try { fileChannel.force(true); + fileChannel.close(); + } catch (IOException e) { + // ignore? } } From f81efe05d7ab945cd4a72c163aa32833d86d380e Mon Sep 17 00:00:00 2001 From: Ruslan Scherbakov Date: Fri, 2 Jun 2023 01:51:28 +0700 Subject: [PATCH 2/5] KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - updated change according to feedbak --- .../org/apache/kafka/common/utils/Utils.java | 24 +++++++++++++++++++ .../internals/log/ProducerStateManager.java | 24 ++++++------------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 2b35a4aa7a6b0..d2ef528b200a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -976,6 +976,30 @@ public static void flushDir(Path path) throws IOException { } } + /** + * Flushes dirty file to guarantee crash consistency. + * + * @throws IOException if flushing the file fails. + */ + public static void flushFile(Path path) throws IOException { + if (path != null) { + try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { + fileChannel.force(true); + } + } + } + + /** + * Flushes dirty file quietly, logs warning when exception happens. + */ + public static void flushFileQuietly(Path path, String name) { + try { + flushFile(path); + } catch (IOException e) { + log.warn("Failed to flush {} at path {} with exception {}", name, path, e); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 64efc9b0f9ee9..517617f803e56 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -24,10 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.utils.ByteUtils; -import org.apache.kafka.common.utils.Crc32C; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.*; import org.apache.kafka.server.util.Scheduler; import org.slf4j.Logger; @@ -688,21 +685,14 @@ private static void writeSnapshot(File file, Map entri long crc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, buffer.limit() - PRODUCER_ENTRIES_OFFSET); ByteUtils.writeUnsignedInt(buffer, CRC_OFFSET, crc); - FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); - fileChannel.write(buffer); - if (scheduler != null) { - scheduler.scheduleOnce("flush-snapshot", () -> closeSnapshotFile(fileChannel)); - } else { - closeSnapshotFile(fileChannel); + try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + fileChannel.write(buffer); } - } - private static void closeSnapshotFile(FileChannel fileChannel) { - try { - fileChannel.force(true); - fileChannel.close(); - } catch (IOException e) { - // ignore? + if (scheduler != null) { + scheduler.scheduleOnce("flush-producer-snapshot", () -> Utils.flushFileQuietly(file.toPath(), "producer-snapshot")); + } else { + Utils.flushFileQuietly(file.toPath(), "producer-snapshot"); } } From 41fb5a4d79c7c9a2481974f747286a039fb6aeac Mon Sep 17 00:00:00 2001 From: Ruslan Scherbakov Date: Fri, 9 Jun 2023 00:38:48 +0700 Subject: [PATCH 3/5] KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - style check error corrected, open/close operations reduced for scheduler == null case --- .../storage/internals/log/ProducerStateManager.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 517617f803e56..9d391e4969acf 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -24,7 +24,11 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.utils.*; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.Scheduler; import org.slf4j.Logger; @@ -687,12 +691,15 @@ private static void writeSnapshot(File file, Map entri try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { fileChannel.write(buffer); + if (scheduler == null) { + // directly flush to disk + fileChannel.force(true); + } } if (scheduler != null) { + // flush to disk offloaded scheduler.scheduleOnce("flush-producer-snapshot", () -> Utils.flushFileQuietly(file.toPath(), "producer-snapshot")); - } else { - Utils.flushFileQuietly(file.toPath(), "producer-snapshot"); } } From eee2ea38fd1e6f94ec7089492e4bb71805a6503e Mon Sep 17 00:00:00 2001 From: Ruslan Scherbakov Date: Thu, 15 Jun 2023 01:16:23 +0700 Subject: [PATCH 4/5] Corrected warn: skipping the third parameter --- clients/src/main/java/org/apache/kafka/common/utils/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index a6ec5e8b87893..5bd9a8b7b3a24 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1027,7 +1027,7 @@ public static void flushFileQuietly(Path path, String name) { try { flushFile(path); } catch (IOException e) { - log.warn("Failed to flush {} at path {} with exception {}", name, path, e); + log.warn("Failed to flush {} at path {}", name, path); } } From 7518b3231fa6a7780d5432d3c7b40c424ed4876f Mon Sep 17 00:00:00 2001 From: Ruslan Scherbakov Date: Thu, 15 Jun 2023 17:28:08 +0700 Subject: [PATCH 5/5] Including in the 3rd parameter --- clients/src/main/java/org/apache/kafka/common/utils/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 5bd9a8b7b3a24..9a2b8d440a0e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1027,7 +1027,7 @@ public static void flushFileQuietly(Path path, String name) { try { flushFile(path); } catch (IOException e) { - log.warn("Failed to flush {} at path {}", name, path); + log.warn("Failed to flush {} at path {}", name, path, e); } }