From c7567f2d61297ae329c48d2e8e10ae881ec507a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Mon, 29 Mar 2021 18:17:36 -0700 Subject: [PATCH 1/6] KAFKA-12543: Change RawSnapshotReader ownership model --- .../scala/kafka/raft/KafkaMetadataLog.scala | 131 +++++----- .../kafka/raft/KafkaMetadataLogTest.scala | 4 +- .../apache/kafka/raft/KafkaRaftClient.java | 81 ++++--- .../kafka/snapshot/FileRawSnapshotReader.java | 10 +- .../kafka/snapshot/RawSnapshotReader.java | 4 +- .../org/apache/kafka/snapshot/Snapshots.java | 36 ++- .../raft/KafkaRaftClientSnapshotTest.java | 223 +++++++++--------- .../java/org/apache/kafka/raft/MockLog.java | 3 - .../org/apache/kafka/raft/MockLogTest.java | 5 +- .../apache/kafka/snapshot/SnapshotsTest.java | 2 +- 10 files changed, 262 insertions(+), 237 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index d01f1c97915cb..1311917899ff6 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -16,29 +16,30 @@ */ package kafka.raft -import java.io.{File, IOException} +import java.io.File import java.nio.file.{Files, NoSuchFileException} -import java.util.concurrent.ConcurrentSkipListSet import java.util.{Optional, Properties} +import java.{util => ju} import kafka.api.ApiVersion import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated} import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel} import kafka.utils.{Logging, Scheduler} import org.apache.kafka.common.record.{MemoryRecords, Records} -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog} import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} import scala.compat.java8.OptionConverters._ +import scala.jdk.CollectionConverters._ final class KafkaMetadataLog private ( log: Log, scheduler: Scheduler, // This object needs to be thread-safe because it is used by the snapshotting thread to notify the // polling thread when snapshots are created. - snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch], + snapshots: ju.TreeMap[OffsetAndEpoch, Optional[FileRawSnapshotReader]], topicPartition: TopicPartition, maxFetchSizeInBytes: Int, val fileDeleteDelayMs: Long // Visible for testing, @@ -165,10 +166,12 @@ final class KafkaMetadataLog private ( case Some(snapshotId) if (snapshotId.epoch > latestEpoch || (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) => // Truncate the log fully if the latest snapshot is greater than the log end offset - log.truncateFullyAndStartAt(snapshotId.offset) + // Delete snapshot after truncating - removeSnapshotFilesBefore(snapshotId) + snapshots synchronized { + removeSnapshotFilesBefore(snapshotId) + } true @@ -242,85 +245,105 @@ final class KafkaMetadataLog private ( } override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = { - try { - if (snapshotIds.contains(snapshotId)) { - Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) - } else { + snapshots synchronized { + val reader = snapshots.computeIfPresent(snapshotId, (_, value) => { + value.asScala.orElse { + try { + Option(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + } catch { + case _: NoSuchFileException => + // TODO: write a log message + // Remove the entry as it doesn't exists in the filesystem + null + } + }.asJava + }) + + if (reader == null) { Optional.empty() + } else { + reader.asInstanceOf[Optional[RawSnapshotReader]] } - } catch { - case _: NoSuchFileException => - Optional.empty() } } override def latestSnapshotId(): Optional[OffsetAndEpoch] = { - val descending = snapshotIds.descendingIterator - if (descending.hasNext) { - Optional.of(descending.next) - } else { - Optional.empty() + snapshots synchronized { + Optional.ofNullable(snapshots.lastEntry()).map(_.getKey()) } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { - val ascendingIterator = snapshotIds.iterator - if (ascendingIterator.hasNext) { - Optional.of(ascendingIterator.next) - } else { - Optional.empty() + snapshots synchronized { + Optional.ofNullable(snapshots.firstEntry()).map(_.getKey()) } } override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = { - snapshotIds.add(snapshotId) + snapshots synchronized { + snapshots.put(snapshotId, Optional.empty()) + } } override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { - latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) && - startOffset < logStartSnapshotId.offset && - logStartSnapshotId.offset <= snapshotId.offset && - log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => - log.deleteOldSegments() + snapshots synchronized { + latestSnapshotId().asScala match { + case Some(snapshotId) if (snapshots.containsKey(logStartSnapshotId) && + startOffset < logStartSnapshotId.offset && + logStartSnapshotId.offset <= snapshotId.offset && + log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + + // Delete all segments that have a "last offset" less than the log start offset + log.deleteOldSegments() - // Delete snapshot after increasing LogStartOffset - removeSnapshotFilesBefore(logStartSnapshotId) + // Delete snapshot after increasing LogStartOffset + removeSnapshotFilesBefore(logStartSnapshotId) - true + true - case _ => false + case _ => false + } } } /** * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset. + * + * This method assumes that the lock for `snapshots` is ready held. */ - private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = { - val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator - while (expiredSnapshotIdsIter.hasNext) { - val snapshotId = expiredSnapshotIdsIter.next() - // If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists - // on the file system, so we should first remove snapshotId and then delete snapshot file. + private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Boolean = { + var snapshotDeleted = false + + val expiredSnapshotIdsIter = snapshots.headMap(logStartSnapshotId, false).entrySet().iterator() + while (expiredSnapshotIdsIter.hasNext()) { + snapshotDeleted = true + + val snapshotEntry = expiredSnapshotIdsIter.next() expiredSnapshotIdsIter.remove() - val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) - val destination = Snapshots.deleteRename(path, snapshotId) - try { - Utils.atomicMoveWithFallback(path, destination, false) - } catch { - case e: IOException => - error(s"Error renaming snapshot file: $path to $destination", e) - } + Snapshots.markForDelete(log.dir.toPath, snapshotEntry.getKey()) + scheduler.schedule( "delete-snapshot-file", - () => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId), - fileDeleteDelayMs) + () => { + snapshotEntry.getValue().ifPresent(_.close()) + Snapshots.deleteIfExists(log.dir.toPath, snapshotEntry.getKey()) + }, + fileDeleteDelayMs + ) } + + snapshotDeleted } override def close(): Unit = { log.close() + snapshots synchronized { + for ((_, value) <- snapshots.asScala) { + value.ifPresent(_.close()) + } + snapshots.clear() + } } } @@ -376,8 +399,8 @@ object KafkaMetadataLog { private def recoverSnapshots( log: Log - ): ConcurrentSkipListSet[OffsetAndEpoch] = { - val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]() + ): ju.TreeMap[OffsetAndEpoch, Optional[FileRawSnapshotReader]] = { + val snapshots = new ju.TreeMap[OffsetAndEpoch, Optional[FileRawSnapshotReader]]() // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start // from logStartOffset Files @@ -397,11 +420,11 @@ object KafkaMetadataLog { // Delete partial snapshot, deleted snapshot and older snapshot Files.deleteIfExists(snapshotPath.path) } else { - snapshotIds.add(snapshotPath.snapshotId) + snapshots.put(snapshotPath.snapshotId, Optional.empty()) } } } - snapshotIds + snapshots } } diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index c54fd4780b626..bc79db1c42160 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -99,9 +99,7 @@ final class KafkaMetadataLogTest { snapshot.freeze() } - TestUtils.resource(log.readSnapshot(snapshotId).get()) { snapshot => - assertEquals(0, snapshot.sizeInBytes()) - } + assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()) } @Test diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 1c759514cd6c9..2d0fd6b92d1d8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1245,51 +1245,50 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( ); } - try (RawSnapshotReader snapshot = snapshotOpt.get()) { - long snapshotSize = snapshot.sizeInBytes(); - if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshotSize) { - return FetchSnapshotResponse.singleton( - log.topicPartition(), - responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) - .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()) - ); - } - - if (partitionSnapshot.position() > Integer.MAX_VALUE) { - throw new IllegalStateException( - String.format( - "Trying to fetch a snapshot with size (%s) and a position (%s) larger than %s", - snapshotSize, - partitionSnapshot.position(), - Integer.MAX_VALUE - ) - ); - } - - int maxSnapshotSize; - try { - maxSnapshotSize = Math.toIntExact(snapshotSize); - } catch (ArithmeticException e) { - maxSnapshotSize = Integer.MAX_VALUE; - } - - UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); - + RawSnapshotReader snapshot = snapshotOpt.get(); + long snapshotSize = snapshot.sizeInBytes(); + if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshotSize) { return FetchSnapshotResponse.singleton( log.topicPartition(), - responsePartitionSnapshot -> { - addQuorumLeader(responsePartitionSnapshot) - .snapshotId() - .setEndOffset(snapshotId.offset) - .setEpoch(snapshotId.epoch); - - return responsePartitionSnapshot - .setSize(snapshotSize) - .setPosition(partitionSnapshot.position()) - .setUnalignedRecords(records); - } + responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) + .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()) + ); + } + + if (partitionSnapshot.position() > Integer.MAX_VALUE) { + throw new IllegalStateException( + String.format( + "Trying to fetch a snapshot with size (%s) and a position (%s) larger than %s", + snapshotSize, + partitionSnapshot.position(), + Integer.MAX_VALUE + ) ); } + + int maxSnapshotSize; + try { + maxSnapshotSize = Math.toIntExact(snapshotSize); + } catch (ArithmeticException e) { + maxSnapshotSize = Integer.MAX_VALUE; + } + + UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); + + return FetchSnapshotResponse.singleton( + log.topicPartition(), + responsePartitionSnapshot -> { + addQuorumLeader(responsePartitionSnapshot) + .snapshotId() + .setEndOffset(snapshotId.offset) + .setEpoch(snapshotId.epoch); + + return responsePartitionSnapshot + .setSize(snapshotSize) + .setPosition(partitionSnapshot.position()) + .setUnalignedRecords(records); + } + ); } private boolean handleFetchSnapshotResponse( diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java index 820230ee64636..3c62d6d6239bd 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.nio.file.Path; -public final class FileRawSnapshotReader implements RawSnapshotReader { +public final class FileRawSnapshotReader implements RawSnapshotReader, AutoCloseable { private final FileRecords fileRecords; private final OffsetAndEpoch snapshotId; @@ -54,8 +54,12 @@ public Records records() { } @Override - public void close() throws IOException { - fileRecords.close(); + public void close() { + try { + fileRecords.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java index 506728d10ae41..1a51999aecfa3 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java @@ -20,12 +20,10 @@ import org.apache.kafka.common.record.UnalignedRecords; import org.apache.kafka.raft.OffsetAndEpoch; -import java.io.Closeable; - /** * Interface for reading snapshots as a sequence of records. */ -public interface RawSnapshotReader extends Closeable { +public interface RawSnapshotReader { /** * Returns the end offset and epoch for the snapshot. */ diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java index 575358fbddd1c..29f62b6bc0527 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.text.NumberFormat; import java.util.Optional; @@ -50,10 +51,6 @@ static Path snapshotDir(Path logDir) { return logDir; } - public static Path snapshotPath(Path logDir, OffsetAndEpoch snapshotId) { - return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) + SUFFIX); - } - static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) { return String.format("%s-%s", OFFSET_FORMATTER.format(snapshotId.offset), EPOCH_FORMATTER.format(snapshotId.epoch)); } @@ -62,10 +59,14 @@ static Path moveRename(Path source, OffsetAndEpoch snapshotId) { return source.resolveSibling(filenameFromSnapshotId(snapshotId) + SUFFIX); } - public static Path deleteRename(Path source, OffsetAndEpoch snapshotId) { + static Path deleteRename(Path source, OffsetAndEpoch snapshotId) { return source.resolveSibling(filenameFromSnapshotId(snapshotId) + DELETE_SUFFIX); } + public static Path snapshotPath(Path logDir, OffsetAndEpoch snapshotId) { + return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) + SUFFIX); + } + public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws IOException { Path dir = snapshotDir(logDir); @@ -104,18 +105,29 @@ public static Optional parse(Path path) { } /** - * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to - * ${file}.deleted, so we try to delete the file as well as the renamed file if exists. + * Delete the snapshot from the filesystem. */ - public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) { - Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId); - Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId); + public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) { + Path immutablePath = snapshotPath(logDir, snapshotId); + Path deletedPath = deleteRename(immutablePath, snapshotId); try { - return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath); + return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath); } catch (IOException e) { - log.error("Error deleting snapshot file " + deletingPath, e); + log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e); return false; } } + /** + * Mark a snapshot for deletion by renaming with the deleted suffix + */ + public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) { + Path immutablePath = snapshotPath(logDir, snapshotId); + Path deletedPath = deleteRename(immutablePath, snapshotId); + try { + Files.move(immutablePath, deletedPath, StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e); + } + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 6fd5147421165..2acf287961a61 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -551,32 +551,31 @@ public void testFetchSnapshotRequestAsLeader() throws Exception { snapshot.freeze(); } - try (RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get()) { - context.deliverRequest( - fetchSnapshotRequest( - context.metadataPartition, - epoch, - snapshotId, - Integer.MAX_VALUE, - 0 - ) - ); - - context.client.poll(); - - FetchSnapshotResponseData.PartitionSnapshot response = context - .assertSentFetchSnapshotResponse(context.metadataPartition) - .get(); - - assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); - assertEquals(snapshot.sizeInBytes(), response.size()); - assertEquals(0, response.position()); - assertEquals(snapshot.sizeInBytes(), response.unalignedRecords().sizeInBytes()); - - UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes())); - - assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); - } + RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); + context.deliverRequest( + fetchSnapshotRequest( + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ) + ); + + context.client.poll(); + + FetchSnapshotResponseData.PartitionSnapshot response = context + .assertSentFetchSnapshotResponse(context.metadataPartition) + .get(); + + assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); + assertEquals(snapshot.sizeInBytes(), response.size()); + assertEquals(0, response.position()); + assertEquals(snapshot.sizeInBytes(), response.unalignedRecords().sizeInBytes()); + + UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes())); + + assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); } @Test @@ -594,62 +593,61 @@ public void testPartialFetchSnapshotRequestAsLeader() throws Exception { snapshot.freeze(); } - try (RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get()) { - // Fetch half of the snapshot - context.deliverRequest( - fetchSnapshotRequest( - context.metadataPartition, - epoch, - snapshotId, - Math.toIntExact(snapshot.sizeInBytes() / 2), - 0 - ) - ); - - context.client.poll(); - - FetchSnapshotResponseData.PartitionSnapshot response = context - .assertSentFetchSnapshotResponse(context.metadataPartition) - .get(); - - assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); - assertEquals(snapshot.sizeInBytes(), response.size()); - assertEquals(0, response.position()); - assertEquals(snapshot.sizeInBytes() / 2, response.unalignedRecords().sizeInBytes()); - - UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes())); - ByteBuffer snapshotBuffer = memoryRecords.buffer(); - - ByteBuffer responseBuffer = ByteBuffer.allocate(Math.toIntExact(snapshot.sizeInBytes())); - responseBuffer.put(((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); - - ByteBuffer expectedBytes = snapshotBuffer.duplicate(); - expectedBytes.limit(Math.toIntExact(snapshot.sizeInBytes() / 2)); - - assertEquals(expectedBytes, responseBuffer.duplicate().flip()); - - // Fetch the remainder of the snapshot - context.deliverRequest( - fetchSnapshotRequest( - context.metadataPartition, - epoch, - snapshotId, - Integer.MAX_VALUE, - responseBuffer.position() - ) - ); - - context.client.poll(); - - response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get(); - assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); - assertEquals(snapshot.sizeInBytes(), response.size()); - assertEquals(responseBuffer.position(), response.position()); - assertEquals(snapshot.sizeInBytes() - (snapshot.sizeInBytes() / 2), response.unalignedRecords().sizeInBytes()); - - responseBuffer.put(((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); - assertEquals(snapshotBuffer, responseBuffer.flip()); - } + RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); + // Fetch half of the snapshot + context.deliverRequest( + fetchSnapshotRequest( + context.metadataPartition, + epoch, + snapshotId, + Math.toIntExact(snapshot.sizeInBytes() / 2), + 0 + ) + ); + + context.client.poll(); + + FetchSnapshotResponseData.PartitionSnapshot response = context + .assertSentFetchSnapshotResponse(context.metadataPartition) + .get(); + + assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); + assertEquals(snapshot.sizeInBytes(), response.size()); + assertEquals(0, response.position()); + assertEquals(snapshot.sizeInBytes() / 2, response.unalignedRecords().sizeInBytes()); + + UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes())); + ByteBuffer snapshotBuffer = memoryRecords.buffer(); + + ByteBuffer responseBuffer = ByteBuffer.allocate(Math.toIntExact(snapshot.sizeInBytes())); + responseBuffer.put(((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); + + ByteBuffer expectedBytes = snapshotBuffer.duplicate(); + expectedBytes.limit(Math.toIntExact(snapshot.sizeInBytes() / 2)); + + assertEquals(expectedBytes, responseBuffer.duplicate().flip()); + + // Fetch the remainder of the snapshot + context.deliverRequest( + fetchSnapshotRequest( + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + responseBuffer.position() + ) + ); + + context.client.poll(); + + response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get(); + assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); + assertEquals(snapshot.sizeInBytes(), response.size()); + assertEquals(responseBuffer.position(), response.position()); + assertEquals(snapshot.sizeInBytes() - (snapshot.sizeInBytes() / 2), response.unalignedRecords().sizeInBytes()); + + responseBuffer.put(((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); + assertEquals(snapshotBuffer, responseBuffer.flip()); } @Test @@ -714,24 +712,23 @@ public void testFetchSnapshotRequestWithInvalidPosition() throws Exception { assertEquals(epoch, response.currentLeader().leaderEpoch()); assertEquals(localId, response.currentLeader().leaderId()); - try (RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get()) { - context.deliverRequest( - fetchSnapshotRequest( - context.metadataPartition, - epoch, - snapshotId, - Integer.MAX_VALUE, - snapshot.sizeInBytes() - ) - ); - - context.client.poll(); - - response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get(); - assertEquals(Errors.POSITION_OUT_OF_RANGE, Errors.forCode(response.errorCode())); - assertEquals(epoch, response.currentLeader().leaderEpoch()); - assertEquals(localId, response.currentLeader().leaderId()); - } + RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); + context.deliverRequest( + fetchSnapshotRequest( + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + snapshot.sizeInBytes() + ) + ); + + context.client.poll(); + + response = context.assertSentFetchSnapshotResponse(context.metadataPartition).get(); + assertEquals(Errors.POSITION_OUT_OF_RANGE, Errors.forCode(response.errorCode())); + assertEquals(epoch, response.currentLeader().leaderEpoch()); + assertEquals(localId, response.currentLeader().leaderId()); } @Test @@ -909,15 +906,14 @@ public void testFetchResponseWithSnapshotId() throws Exception { context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset, snapshotId.epoch); // Check that the snapshot was written to the log - try (RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get()) { - assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); - } + RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); + assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); + SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); // Check that listener was notified of the new snapshot - try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { - assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); + try (SnapshotReader reader = context.listener.drainHandledSnapshot().get()) { + assertEquals(snapshotId, reader.snapshotId()); + SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader); } } @@ -1013,15 +1009,14 @@ public void testFetchSnapshotResponsePartialData() throws Exception { context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset, snapshotId.epoch); // Check that the snapshot was written to the log - try (RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get()) { - assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); - } + RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); + assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); + SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); // Check that listener was notified of the new snapshot - try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { - assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); + try (SnapshotReader reader = context.listener.drainHandledSnapshot().get()) { + assertEquals(snapshotId, reader.snapshotId()); + SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader); } } diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java index bf03a06ee8f4f..5feb9435f43d5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java @@ -709,8 +709,5 @@ public UnalignedRecords slice(long position, int size) { public Records records() { return data; } - - @Override - public void close() {} } } diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java index 35e13cec65a01..4942139270d1c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java @@ -441,9 +441,8 @@ public void testCreateSnapshot() throws IOException { snapshot.freeze(); } - try (RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get()) { - assertEquals(0, snapshot.sizeInBytes()); - } + RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get(); + assertEquals(0, snapshot.sizeInBytes()); } @Test diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java index 7960a834ea49d..ae89543e3d1af 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java @@ -118,7 +118,7 @@ public void testDeleteSnapshot(boolean renameBeforeDeleting) throws IOException // rename snapshot before deleting Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId), false); - assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId())); + assertTrue(Snapshots.deleteIfExists(logDirPath, snapshot.snapshotId())); assertFalse(Files.exists(snapshotPath)); assertFalse(Files.exists(Snapshots.deleteRename(snapshotPath, snapshotId))); } From ee79a74a8cc4f7e77827b40307489fcbee7956d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Mon, 3 May 2021 10:27:02 -0700 Subject: [PATCH 2/6] Only hold a lock when modifying `snapshots` variable --- .../scala/kafka/raft/KafkaMetadataLog.scala | 129 ++++++++++-------- 1 file changed, 75 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 1311917899ff6..b8a4adc342d3f 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -19,7 +19,6 @@ package kafka.raft import java.io.File import java.nio.file.{Files, NoSuchFileException} import java.util.{Optional, Properties} -import java.{util => ju} import kafka.api.ApiVersion import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated} @@ -31,15 +30,16 @@ import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog} import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import scala.annotation.nowarn +import scala.collection.mutable import scala.compat.java8.OptionConverters._ -import scala.jdk.CollectionConverters._ final class KafkaMetadataLog private ( log: Log, scheduler: Scheduler, // This object needs to be thread-safe because it is used by the snapshotting thread to notify the // polling thread when snapshots are created. - snapshots: ju.TreeMap[OffsetAndEpoch, Optional[FileRawSnapshotReader]], + snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], topicPartition: TopicPartition, maxFetchSizeInBytes: Int, val fileDeleteDelayMs: Long // Visible for testing, @@ -162,21 +162,22 @@ final class KafkaMetadataLog private ( override def truncateToLatestSnapshot(): Boolean = { val latestEpoch = log.latestEpoch.getOrElse(0) - latestSnapshotId().asScala match { + val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match { case Some(snapshotId) if (snapshotId.epoch > latestEpoch || (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) => // Truncate the log fully if the latest snapshot is greater than the log end offset log.truncateFullyAndStartAt(snapshotId.offset) - // Delete snapshot after truncating + // Forget snapshots less than the log start offset snapshots synchronized { - removeSnapshotFilesBefore(snapshotId) + (true, forgetSnapshotsBefore(snapshotId)) } - - true - - case _ => false + case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) } + + removeSnapshots(forgottenSnapshots) + truncated } override def initializeLeaderEpoch(epoch: Int): Unit = { @@ -246,49 +247,55 @@ final class KafkaMetadataLog private ( override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = { snapshots synchronized { - val reader = snapshots.computeIfPresent(snapshotId, (_, value) => { - value.asScala.orElse { + val reader = snapshots.get(snapshotId) match { + case None => + // Snapshot doesn't exists + None + case Some(None) => + // Snapshot exists but has never been read before try { - Option(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) + snapshots.put(snapshotId, snapshotReader) + snapshotReader } catch { case _: NoSuchFileException => - // TODO: write a log message - // Remove the entry as it doesn't exists in the filesystem - null + // Snapshot doesn't exists in the data dir; remove + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + warn(s"Couldn't read $snapshotId; expected to find snapshot file $path") + snapshots.remove(snapshotId) + None } - }.asJava - }) - - if (reader == null) { - Optional.empty() - } else { - reader.asInstanceOf[Optional[RawSnapshotReader]] + case Some(value) => + // Snapshot exists and it is already open; do nothing + value } + + reader.asJava.asInstanceOf[Optional[RawSnapshotReader]] } } override def latestSnapshotId(): Optional[OffsetAndEpoch] = { snapshots synchronized { - Optional.ofNullable(snapshots.lastEntry()).map(_.getKey()) + snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { snapshots synchronized { - Optional.ofNullable(snapshots.firstEntry()).map(_.getKey()) + snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = { snapshots synchronized { - snapshots.put(snapshotId, Optional.empty()) + snapshots.put(snapshotId, None) } } override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { - snapshots synchronized { + val (deleted, forgottenSnapshots) = snapshots synchronized { latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshots.containsKey(logStartSnapshotId) && + case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && startOffset < logStartSnapshotId.offset && logStartSnapshotId.offset <= snapshotId.offset && log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => @@ -296,51 +303,65 @@ final class KafkaMetadataLog private ( // Delete all segments that have a "last offset" less than the log start offset log.deleteOldSegments() - // Delete snapshot after increasing LogStartOffset - removeSnapshotFilesBefore(logStartSnapshotId) - - true - - case _ => false + // Forget snapshots less than the log start offset + (true, forgetSnapshotsBefore(logStartSnapshotId)) + case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) } } + + removeSnapshots(forgottenSnapshots) + deleted } /** - * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset. + * Forget the snapshots earlier than a given snapshot id and return the associated + * snapshot readers. * * This method assumes that the lock for `snapshots` is ready held. */ - private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Boolean = { - var snapshotDeleted = false - - val expiredSnapshotIdsIter = snapshots.headMap(logStartSnapshotId, false).entrySet().iterator() - while (expiredSnapshotIdsIter.hasNext()) { - snapshotDeleted = true - - val snapshotEntry = expiredSnapshotIdsIter.next() - expiredSnapshotIdsIter.remove() + @nowarn("cat=deprecation") // Needed for TreeMap.until + private def forgetSnapshotsBefore( + logStartSnapshotId: OffsetAndEpoch + ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = { + val expiredSnapshots = snapshots.until(logStartSnapshotId).clone() + snapshots --= expiredSnapshots.keys + + expiredSnapshots + } - Snapshots.markForDelete(log.dir.toPath, snapshotEntry.getKey()) + /** + * Rename the given snapshots on the log directory. Asynchronously, close and delete the given + * snapshots. + */ + private def removeSnapshots( + expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] + ): Unit = { + expiredSnapshots.foreach { case (snapshotId, _) => + Snapshots.markForDelete(log.dir.toPath, snapshotId) + } + if (!expiredSnapshots.isEmpty) { scheduler.schedule( "delete-snapshot-file", () => { - snapshotEntry.getValue().ifPresent(_.close()) - Snapshots.deleteIfExists(log.dir.toPath, snapshotEntry.getKey()) + expiredSnapshots.foreach { case (snapshotId, snapshotReader) => + snapshotReader.foreach(_.close()) + Snapshots.deleteIfExists(log.dir.toPath, snapshotId) + } }, fileDeleteDelayMs ) } - - snapshotDeleted } override def close(): Unit = { log.close() snapshots synchronized { - for ((_, value) <- snapshots.asScala) { - value.ifPresent(_.close()) + snapshots.foreach { case (_, value) => + value.foreach { snapshot => + snapshot.close() + } } snapshots.clear() } @@ -399,8 +420,8 @@ object KafkaMetadataLog { private def recoverSnapshots( log: Log - ): ju.TreeMap[OffsetAndEpoch, Optional[FileRawSnapshotReader]] = { - val snapshots = new ju.TreeMap[OffsetAndEpoch, Optional[FileRawSnapshotReader]]() + ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = { + val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start // from logStartOffset Files @@ -420,7 +441,7 @@ object KafkaMetadataLog { // Delete partial snapshot, deleted snapshot and older snapshot Files.deleteIfExists(snapshotPath.path) } else { - snapshots.put(snapshotPath.snapshotId, Optional.empty()) + snapshots.put(snapshotPath.snapshotId, None) } } } From 4b0de9b75684e9f2df65f2f616cb57cb4f62740c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Mon, 3 May 2021 10:51:29 -0700 Subject: [PATCH 3/6] Clean up code --- .../scala/kafka/raft/KafkaMetadataLog.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index b8a4adc342d3f..54c4576bb5892 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -17,7 +17,7 @@ package kafka.raft import java.io.File -import java.nio.file.{Files, NoSuchFileException} +import java.nio.file.{Files, NoSuchFileException, Path} import java.util.{Optional, Properties} import kafka.api.ApiVersion @@ -341,15 +341,10 @@ final class KafkaMetadataLog private ( Snapshots.markForDelete(log.dir.toPath, snapshotId) } - if (!expiredSnapshots.isEmpty) { + if (expiredSnapshots.nonEmpty) { scheduler.schedule( - "delete-snapshot-file", - () => { - expiredSnapshots.foreach { case (snapshotId, snapshotReader) => - snapshotReader.foreach(_.close()) - Snapshots.deleteIfExists(log.dir.toPath, snapshotId) - } - }, + "delete-snapshot-files", + KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots), fileDeleteDelayMs ) } @@ -358,11 +353,7 @@ final class KafkaMetadataLog private ( override def close(): Unit = { log.close() snapshots synchronized { - snapshots.foreach { case (_, value) => - value.foreach { snapshot => - snapshot.close() - } - } + snapshots.values.flatten.foreach(_.close()) snapshots.clear() } } @@ -448,4 +439,13 @@ object KafkaMetadataLog { snapshots } + private def deleteSnapshotFiles( + logDir: Path, + expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] + )(): Unit = { + expiredSnapshots.foreach { case (snapshotId, snapshotReader) => + snapshotReader.foreach(_.close()) + Snapshots.deleteIfExists(logDir, snapshotId) + } + } } From eef4557651e6d4a2b34bd6d2d11339b4dce6e7f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Fri, 7 May 2021 15:08:21 -0700 Subject: [PATCH 4/6] Keep using atomicMoveWithFallback --- core/src/main/scala/kafka/raft/KafkaMetadataLog.scala | 6 ++++-- raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 54c4576bb5892..7dda7cd924048 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -163,8 +163,10 @@ final class KafkaMetadataLog private ( override def truncateToLatestSnapshot(): Boolean = { val latestEpoch = log.latestEpoch.getOrElse(0) val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotId.epoch > latestEpoch || - (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) => + case Some(snapshotId) if ( + snapshotId.epoch > latestEpoch || + (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset) + ) => // Truncate the log fully if the latest snapshot is greater than the log end offset log.truncateFullyAndStartAt(snapshotId.offset) diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java index 29f62b6bc0527..4139099dae8af 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java @@ -17,13 +17,13 @@ package org.apache.kafka.snapshot; import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardCopyOption; import java.text.NumberFormat; import java.util.Optional; @@ -125,7 +125,7 @@ public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) { Path immutablePath = snapshotPath(logDir, snapshotId); Path deletedPath = deleteRename(immutablePath, snapshotId); try { - Files.move(immutablePath, deletedPath, StandardCopyOption.ATOMIC_MOVE); + Utils.atomicMoveWithFallback(immutablePath, deletedPath, false); } catch (IOException e) { log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e); } From 1f3ce49375ab8c5deb82990045b757201406278e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Tue, 11 May 2021 10:08:18 -0700 Subject: [PATCH 5/6] Use different syntax to get around IntelliJ error --- core/src/main/scala/kafka/raft/KafkaMetadataLog.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 7dda7cd924048..d174221caec42 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -320,7 +320,7 @@ final class KafkaMetadataLog private ( * Forget the snapshots earlier than a given snapshot id and return the associated * snapshot readers. * - * This method assumes that the lock for `snapshots` is ready held. + * This method assumes that the lock for `snapshots` is already held. */ @nowarn("cat=deprecation") // Needed for TreeMap.until private def forgetSnapshotsBefore( @@ -333,8 +333,8 @@ final class KafkaMetadataLog private ( } /** - * Rename the given snapshots on the log directory. Asynchronously, close and delete the given - * snapshots. + * Rename the given snapshots on the log directory. Asynchronously, close and delete the + * given snapshots after some delay. */ private def removeSnapshots( expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] @@ -444,7 +444,7 @@ object KafkaMetadataLog { private def deleteSnapshotFiles( logDir: Path, expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] - )(): Unit = { + ): () => Unit = () => { expiredSnapshots.foreach { case (snapshotId, snapshotReader) => snapshotReader.foreach(_.close()) Snapshots.deleteIfExists(logDir, snapshotId) From 8617df7c84ecc2356e9c0e7c0a3489feba826e7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Thu, 13 May 2021 12:58:50 -0700 Subject: [PATCH 6/6] Throw an exception instead of logging when renaming snapshot for delete --- core/src/main/scala/kafka/raft/KafkaMetadataLog.scala | 10 ++++++---- .../apache/kafka/snapshot/FileRawSnapshotReader.java | 10 +++++++++- .../main/java/org/apache/kafka/snapshot/Snapshots.java | 10 +++++++++- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index d174221caec42..5ebade240d77a 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -23,7 +23,7 @@ import java.util.{Optional, Properties} import kafka.api.ApiVersion import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated} import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel} -import kafka.utils.{Logging, Scheduler} +import kafka.utils.{CoreUtils, Logging, Scheduler} import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} @@ -37,8 +37,8 @@ import scala.compat.java8.OptionConverters._ final class KafkaMetadataLog private ( log: Log, scheduler: Scheduler, - // This object needs to be thread-safe because it is used by the snapshotting thread to notify the - // polling thread when snapshots are created. + // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the + // polling thread when snapshots are created. This object is also used to store any opened snapshot reader. snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], topicPartition: TopicPartition, maxFetchSizeInBytes: Int, @@ -446,7 +446,9 @@ object KafkaMetadataLog { expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] ): () => Unit = () => { expiredSnapshots.foreach { case (snapshotId, snapshotReader) => - snapshotReader.foreach(_.close()) + snapshotReader.foreach { reader => + CoreUtils.swallow(reader.close(), this) + } Snapshots.deleteIfExists(logDir, snapshotId) } } diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java index 3c62d6d6239bd..59d3c9cc628c6 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java @@ -22,6 +22,7 @@ import org.apache.kafka.raft.OffsetAndEpoch; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Path; public final class FileRawSnapshotReader implements RawSnapshotReader, AutoCloseable { @@ -58,7 +59,14 @@ public void close() { try { fileRecords.close(); } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException( + String.format( + "Unable to close snapshot reader %s at %s", + snapshotId, + fileRecords.file + ), + e + ); } } diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java index 4139099dae8af..0974869733dac 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.text.NumberFormat; @@ -127,7 +128,14 @@ public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) { try { Utils.atomicMoveWithFallback(immutablePath, deletedPath, false); } catch (IOException e) { - log.error("Error renaming snapshot file from {} to {}", immutablePath, deletedPath, e); + throw new UncheckedIOException( + String.format( + "Error renaming snapshot file from %s to %s", + immutablePath, + deletedPath + ), + e + ); } } }