From 6227e799fc8e179cc9d7199ffa1b829a0faebb1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Fri, 16 Jul 2021 11:14:53 -0700 Subject: [PATCH] KAFKA-13098: Fix NoSuchFileException during snapshot recovery Java's FileTreeIterator throws an NoSuchFileException when visting @metadata-0/partition.metadata.tmp. This is most like do to the fact that the Log type asynchronously creates and delete that file. --- .../scala/kafka/raft/KafkaMetadataLog.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 23c8f2450fb07..629653101b814 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} -import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, Snapshots} import java.io.File import java.nio.file.{Files, NoSuchFileException, Path} @@ -604,17 +604,11 @@ object KafkaMetadataLog { val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start // from logStartOffset - Files - .walk(log.dir.toPath, 1) - .map[Optional[SnapshotPath]] { path => - if (path != log.dir.toPath) { - Snapshots.parse(path) - } else { - Optional.empty() - } - } - .forEach { path => - path.ifPresent { snapshotPath => + val filesInDir = Files.newDirectoryStream(log.dir.toPath) + + try { + filesInDir.forEach { path => + Snapshots.parse(path).ifPresent { snapshotPath => if (snapshotPath.partial || snapshotPath.deleted || snapshotPath.snapshotId.offset < log.logStartOffset) { @@ -625,6 +619,10 @@ object KafkaMetadataLog { } } } + } finally { + filesInDir.close() + } + snapshots }