diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 23c8f2450fb07..629653101b814 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} -import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} +import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, Snapshots} import java.io.File import java.nio.file.{Files, NoSuchFileException, Path} @@ -604,17 +604,11 @@ object KafkaMetadataLog { val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]] // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start // from logStartOffset - Files - .walk(log.dir.toPath, 1) - .map[Optional[SnapshotPath]] { path => - if (path != log.dir.toPath) { - Snapshots.parse(path) - } else { - Optional.empty() - } - } - .forEach { path => - path.ifPresent { snapshotPath => + val filesInDir = Files.newDirectoryStream(log.dir.toPath) + + try { + filesInDir.forEach { path => + Snapshots.parse(path).ifPresent { snapshotPath => if (snapshotPath.partial || snapshotPath.deleted || snapshotPath.snapshotId.offset < log.logStartOffset) { @@ -625,6 +619,10 @@ object KafkaMetadataLog { } } } + } finally { + filesInDir.close() + } + snapshots }