Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,30 @@ public static void flushDir(Path path) throws IOException {
}
}

/**
* Flushes dirty file to guarantee crash consistency.
*
* @throws IOException if flushing the file fails.
*/
public static void flushFile(Path path) throws IOException {
if (path != null) {
try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
fileChannel.force(true);
}
}
}

/**
* Flushes dirty file quietly, logs warning when exception happens.
*/
public static void flushFileQuietly(Path path, String name) {
try {
flushFile(path);
} catch (IOException e) {
log.warn("Failed to flush {} at path {}", name, path, e);
}
}

/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
// we manually override the state offset here prior to taking the snapshot.
producerStateManager.updateMapEndOffset(newSegment.baseOffset)
producerStateManager.takeSnapshot()
producerStateManager.takeSnapshot(scheduler)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need to increase the default size of background threads since we are adding more responsibility to it. Thoughts?

updateHighWatermarkWithLogEndOffset()
// Schedule an asynchronous flush of the old segment
scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.Scheduler;
import org.slf4j.Logger;

import java.io.File;
Expand Down Expand Up @@ -430,11 +432,19 @@ public Optional<ProducerStateEntry> lastEntry(long producerId) {
* Take a snapshot at the current end offset if one does not already exist.
*/
public void takeSnapshot() throws IOException {
takeSnapshot(null);
}

/**
* Take a snapshot at the current end offset if one does not already exist.
* Flush the snapshot asynchronously if scheduler != null
*/
public void takeSnapshot(Scheduler scheduler) throws IOException {
Comment thread
novosibman marked this conversation as resolved.
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset));
long start = time.hiResClockMs();
writeSnapshot(snapshotFile.file(), producers);
writeSnapshot(snapshotFile.file(), producers, scheduler);
log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset,
producers.size(), time.hiResClockMs() - start);

Expand Down Expand Up @@ -649,7 +659,7 @@ public static List<ProducerStateEntry> readSnapshot(File file) throws IOExceptio
}
}

private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entries) throws IOException {
private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entries, Scheduler scheduler) throws IOException {
Struct struct = new Struct(PID_SNAPSHOT_MAP_SCHEMA);
struct.set(VERSION_FIELD, PRODUCER_SNAPSHOT_VERSION);
struct.set(CRC_FIELD, 0L); // we'll fill this after writing the entries
Expand Down Expand Up @@ -681,7 +691,15 @@ private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entri

try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
fileChannel.write(buffer);
fileChannel.force(true);
if (scheduler == null) {
// directly flush to disk
fileChannel.force(true);
}
}

if (scheduler != null) {
// flush to disk offloaded
scheduler.scheduleOnce("flush-producer-snapshot", () -> Utils.flushFileQuietly(file.toPath(), "producer-snapshot"));
}
}

Expand Down