diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 5315903cee438..bf9d0d8f16d14 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1007,6 +1007,19 @@ public static void flushDir(Path path) throws IOException { } } + /** + * Flushes dirty directories to guarantee crash consistency with swallowing {@link NoSuchFileException} + * + * @throws IOException if flushing the directory fails. + */ + public static void flushDirIfExists(Path path) throws IOException { + try { + flushDir(path); + } catch (NoSuchFileException e) { + log.warn("Failed to flush directory {}", path); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index b90178d5c5d7a..6dfd92461c97b 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -173,8 +173,11 @@ class LocalLog(@volatile private var _dir: File, val segmentsToFlush = segments.values(currentRecoveryPoint, offset) segmentsToFlush.foreach(_.flush()) // If there are any new segments, we need to flush the parent directory for crash consistency. - if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint)) - Utils.flushDir(dir.toPath) + if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint)) { + // The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here. + // Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go. + Utils.flushDirIfExists(dir.toPath) + } } } diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index 82fa5ffe8fc4e..8556bc74fa570 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -31,7 +31,9 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.mockito.Mockito.{doReturn, spy} import scala.jdk.CollectionConverters._ @@ -701,6 +703,20 @@ class LocalLogTest { assertThrows(classOf[KafkaException], () => log.roll()) } + @Test + def testFlushingNonExistentDir(): Unit = { + val spyLog = spy(log) + + val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) + appendRecords(List(record)) + mockTime.sleep(1) + val newSegment = log.roll() + + // simulate the directory is renamed concurrently + doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir + assertDoesNotThrow((() => spyLog.flush(newSegment.baseOffset)): Executable) + } + private def createLocalLogWithActiveSegment(dir: File = logDir, config: LogConfig, segments: LogSegments = new LogSegments(topicPartition),