From ff2b65e40ead81fd4f6b63961132a6a5cd677a6d Mon Sep 17 00:00:00 2001 From: Ruslan Scherbakov Date: Sat, 27 May 2023 01:29:23 +0700 Subject: [PATCH] Suggested fix for Kafka latency spikes during segment rolling - KAFKA-9693 --- .../scala/kafka/log/ProducerStateManager.scala | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 3ed1cef28789d..959308b6d1cfb 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils} import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{immutable, mutable} +import scala.concurrent.{ExecutionContext, Future} class CorruptSnapshotException(msg: String) extends KafkaException(msg) @@ -419,6 +420,8 @@ object ProducerStateManager { } } + val executor = java.util.concurrent.Executors.newFixedThreadPool(1) + private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { val struct = new Struct(PidSnapshotMapSchema) struct.set(VersionField, ProducerSnapshotVersion) @@ -449,10 +452,18 @@ object ProducerStateManager { val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE) try { fileChannel.write(buffer) - fileChannel.force(true) - } finally { - fileChannel.close() + } catch { + case e: Exception => throw e } + + val ec = ExecutionContext.fromExecutorService(executor) + Future { + try { + fileChannel.force(true) + } finally { + fileChannel.close() + } + }(ec) } private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix)