From 9280be76e2556c0ede40fe9d293bf63a64906b31 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Wed, 23 Aug 2023 23:42:55 +0900 Subject: [PATCH 1/2] KAFKA-15391: Handle concurrent dir rename which makes log-dir to be offline unexpectedly --- .../org/apache/kafka/common/utils/Utils.java | 13 +++++++++++++ core/src/main/scala/kafka/log/LocalLog.scala | 7 +++++-- .../test/scala/unit/kafka/log/LocalLogTest.scala | 16 ++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 5315903cee438..bf9d0d8f16d14 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1007,6 +1007,19 @@ public static void flushDir(Path path) throws IOException { } } + /** + * Flushes dirty directories to guarantee crash consistency with swallowing {@link NoSuchFileException} + * + * @throws IOException if flushing the directory fails. + */ + public static void flushDirIfExists(Path path) throws IOException { + try { + flushDir(path); + } catch (NoSuchFileException e) { + log.warn("Failed to flush directory {}", path); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index b90178d5c5d7a..6dfd92461c97b 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -173,8 +173,11 @@ class LocalLog(@volatile private var _dir: File, val segmentsToFlush = segments.values(currentRecoveryPoint, offset) segmentsToFlush.foreach(_.flush()) // If there are any new segments, we need to flush the parent directory for crash consistency. - if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint)) - Utils.flushDir(dir.toPath) + if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint)) { + // The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here. + // Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go. + Utils.flushDirIfExists(dir.toPath) + } } } diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index 82fa5ffe8fc4e..8f6bbc299ff9c 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -31,7 +31,9 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.mockito.Mockito.{doReturn, spy} import scala.jdk.CollectionConverters._ @@ -701,6 +703,20 @@ class LocalLogTest { assertThrows(classOf[KafkaException], () => log.roll()) } + @Test + def testFlushingNonExistentDir(): Unit = { + val spyLog = spy(log) + + val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) + appendRecords(List(record)) + mockTime.sleep(1) + val newSegment = log.roll() + + // simulate the directory is renamed concurrently + doReturn(new File("__NON_EXISTENT__")).when(spyLog).dir + assertDoesNotThrow((() => spyLog.flush(newSegment.baseOffset)): Executable) + } + private def createLocalLogWithActiveSegment(dir: File = logDir, config: LogConfig, segments: LogSegments = new LogSegments(topicPartition), From 16b5d9b35d46b0841d78e1a1f9cb00ecef80e880 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 24 Aug 2023 07:10:50 +0900 Subject: [PATCH 2/2] fix compilation failure --- core/src/test/scala/unit/kafka/log/LocalLogTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index 8f6bbc299ff9c..8556bc74fa570 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -713,7 +713,7 @@ class LocalLogTest { val newSegment = log.roll() // simulate the directory is renamed concurrently - doReturn(new File("__NON_EXISTENT__")).when(spyLog).dir + doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir assertDoesNotThrow((() => spyLog.flush(newSegment.baseOffset)): Executable) }