diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index cec407fcbd8c1..9a2b8d440a0e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1007,6 +1007,30 @@ public static void flushDir(Path path) throws IOException { } } + /** + * Flushes dirty file to guarantee crash consistency. + * + * @throws IOException if flushing the file fails. + */ + public static void flushFile(Path path) throws IOException { + if (path != null) { + try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) { + fileChannel.force(true); + } + } + } + + /** + * Flushes dirty file quietly, logs warning when exception happens. + */ + public static void flushFileQuietly(Path path, String name) { + try { + flushFile(path); + } catch (IOException e) { + log.warn("Failed to flush {} at path {}", name, path, e); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index a2aedf0a29035..ba63479671347 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1497,7 +1497,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), // we manually override the state offset here prior to taking the snapshot. producerStateManager.updateMapEndOffset(newSegment.baseOffset) - producerStateManager.takeSnapshot() + producerStateManager.takeSnapshot(scheduler) updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java index 6212711cb2712..9d391e4969acf 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java @@ -28,6 +28,8 @@ import org.apache.kafka.common.utils.Crc32C; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.Scheduler; import org.slf4j.Logger; import java.io.File; @@ -430,11 +432,19 @@ public Optional lastEntry(long producerId) { * Take a snapshot at the current end offset if one does not already exist. */ public void takeSnapshot() throws IOException { + takeSnapshot(null); + } + + /** + * Take a snapshot at the current end offset if one does not already exist. + * Flush the snapshot asynchronously if scheduler != null + */ + public void takeSnapshot(Scheduler scheduler) throws IOException { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset)); long start = time.hiResClockMs(); - writeSnapshot(snapshotFile.file(), producers); + writeSnapshot(snapshotFile.file(), producers, scheduler); log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset, producers.size(), time.hiResClockMs() - start); @@ -649,7 +659,7 @@ public static List readSnapshot(File file) throws IOExceptio } } - private static void writeSnapshot(File file, Map entries) throws IOException { + private static void writeSnapshot(File file, Map entries, Scheduler scheduler) throws IOException { Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA); struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION); struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries @@ -681,7 +691,15 @@ private static void writeSnapshot(File file, Map entri try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { fileChannel.write(buffer); - fileChannel.force(true); + if (scheduler == null) { + // directly flush to disk + fileChannel.force(true); + } + } + + if (scheduler != null) { + // flush to disk offloaded + scheduler.scheduleOnce("flush-producer-snapshot", () -> Utils.flushFileQuietly(file.toPath(), "producer-snapshot")); } }