From 193a3ceb9ecd7c2a78eec7b9638caf5f0fee4ca3 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Fri, 13 Oct 2023 11:57:07 +0300 Subject: [PATCH] KAFKA-15572: Race condition between log dir roll and log rename dir --- .../org/apache/kafka/common/utils/Utils.java | 13 ------------- core/src/main/scala/kafka/log/LocalLog.scala | 4 +--- core/src/main/scala/kafka/log/UnifiedLog.scala | 3 ++- .../test/scala/unit/kafka/log/LocalLogTest.scala | 16 ---------------- 4 files changed, 3 insertions(+), 33 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index bf9d0d8f16d14..5315903cee438 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1007,19 +1007,6 @@ public static void flushDir(Path path) throws IOException { } } - /** - * Flushes dirty directories to guarantee crash consistency with swallowing {@link NoSuchFileException} - * - * @throws IOException if flushing the directory fails. - */ - public static void flushDirIfExists(Path path) throws IOException { - try { - flushDir(path); - } catch (NoSuchFileException e) { - log.warn("Failed to flush directory {}", path); - } - } - /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 6dfd92461c97b..c06999705bd4a 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -174,9 +174,7 @@ class LocalLog(@volatile private var _dir: File, segmentsToFlush.foreach(_.flush()) // If there are any new segments, we need to flush the parent directory for crash consistency. if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint)) { - // The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here. - // Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go. - Utils.flushDirIfExists(dir.toPath) + Utils.flushDir(dir.toPath) } } } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 601d444d0e69c..1b03c8cd65f02 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1648,8 +1648,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, debug(s"Flushing log up to offset $offset ($includingOffsetStr)" + s"with recovery point $newRecoveryPoint, last flushed: $lastFlushTime, current time: ${time.milliseconds()}," + s"unflushed: ${localLog.unflushedMessages}") - localLog.flush(flushOffset) lock synchronized { + // Flushing under lock, as log directory can be concurrently renamed in LocalLog.renameDir + localLog.flush(flushOffset) localLog.markFlushed(newRecoveryPoint) } } diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index 8556bc74fa570..82fa5ffe8fc4e 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -31,9 +31,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} -import org.mockito.Mockito.{doReturn, spy} import scala.jdk.CollectionConverters._ @@ -703,20 +701,6 @@ class LocalLogTest { assertThrows(classOf[KafkaException], () => log.roll()) } - @Test - def testFlushingNonExistentDir(): Unit = { - val spyLog = spy(log) - - val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes) - appendRecords(List(record)) - mockTime.sleep(1) - val newSegment = log.roll() - - // simulate the directory is renamed concurrently - doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir - assertDoesNotThrow((() => spyLog.flush(newSegment.baseOffset)): Executable) - } - private def createLocalLogWithActiveSegment(dir: File = logDir, config: LogConfig, segments: LogSegments = new LogSegments(topicPartition),