From 9d743dbb19c7992a245b19495af737ed3e8e9898 Mon Sep 17 00:00:00 2001 From: Ilyas Toumlilt Date: Fri, 1 Aug 2025 17:01:41 +0200 Subject: [PATCH 1/4] FIX: Prevent race condition causing log directory to go offline A race condition can occur during replica rebalancing where a log segment's file is closed and deleted/renamed while an asynchronous flush or read operation is still pending. This would previously cause an unhandled ClosedChannelException, leading the ReplicaManager to mark the entire log directory as offline. The fix involves removing the explicit close() of the source log in replaceCurrentWithFutureLog(). By leaving the channel open, concurrent operations can complete successfully on the renamed files (which are moved to the .delete directory). The log is already scheduled for asynchronous deletion (via addLogToBeDeleted), which ensures that the log and its resources will be properly closed and deleted by the background deletion thread after the configured file delete delay. A new unit test `testReplaceCurrentWithFutureLogDoesNotCloseSourceLog` in `LogManagerTest` has been added to verify that the source log is not closed during the swap operation. The fix involves removing the explicit close() of the source log in replaceCurrentWithFutureLog(). By leaving the channel open, concurrent operations can complete successfully on the renamed files. The log is already scheduled for asynchronous deletion (via addLogToBeDeleted), which ensures that the log and its resources will be properly closed and deleted by the background deletion thread after the configured file delete delay. --- .../src/main/scala/kafka/log/LogManager.scala | 1 - .../scala/unit/kafka/log/LogManagerTest.scala | 37 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index bfee35061f82f..85a9bcb3330df 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1249,7 +1249,6 @@ class LogManager(logDirs: Seq[File], srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true) // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. - srcLog.close() val logDir = srcLog.parentDirFile val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index a8946a3d1395f..b2a2bf3d54e48 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -43,7 +43,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler} -import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} +import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog, LogManager => JLogManager} import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.function.Executable @@ -1112,6 +1112,41 @@ class LogManagerTest { assertEquals(2, logManager.directoryIdsSet.size) } + /** + * Test that replaceCurrentWithFutureLog does not close the source log, preventing race conditions + * where a concurrent read/flush could fail with ClosedChannelException. + */ + @Test + def testReplaceCurrentWithFutureLogDoesNotCloseSourceLog(): Unit = { + val logDir1 = TestUtils.tempDir() + val logDir2 = TestUtils.tempDir() + logManager = createLogManager(Seq(logDir1, logDir2)) + logManager.startup(Set.empty) + + val topicName = "replace-log" + val tp = new TopicPartition(topicName, 0) + val currentLog = logManager.getOrCreateLog(tp, topicId = Optional.empty) + // Create a future log in a different directory + logManager.maybeUpdatePreferredLogDir(tp, logDir2.getAbsolutePath) + logManager.getOrCreateLog(tp, isFuture = true, topicId = Optional.empty) + + // Spy on the source log to verify close() is not called + val spyCurrentLog = spy(currentLog) + // Inject the spy into the map + val field = classOf[LogManager].getDeclaredField("currentLogs") + field.setAccessible(true) + val currentLogs = field.get(logManager).asInstanceOf[ConcurrentHashMap[TopicPartition, UnifiedLog]] + currentLogs.put(tp, spyCurrentLog) + + logManager.replaceCurrentWithFutureLog(tp) + + // Verify close() was NOT called on the source log + verify(spyCurrentLog, never()).close() + + // Verify the source log was renamed to .delete + assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX)) + } + @Test def testCheckpointLogStartOffsetForRemoteTopic(): Unit = { logManager.shutdown() From fb4951ac41c542047807bf62c2efa1b4493dd760 Mon Sep 17 00:00:00 2001 From: itoumlilt Date: Mon, 12 Jan 2026 09:53:14 +0100 Subject: [PATCH 2/4] Address PR feedback: update comment and fix import ordering --- core/src/main/scala/kafka/log/LogManager.scala | 9 +++++++-- core/src/test/scala/unit/kafka/log/LogManagerTest.scala | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 85a9bcb3330df..1d9e4edffc9eb 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1247,8 +1247,13 @@ class LogManager(logDirs: Seq[File], try { sourceLog.foreach { srcLog => srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true) - // Now that replica in source log directory has been successfully renamed for deletion. - // Close the log, update checkpoint files, and enqueue this log to be deleted. + // Now that replica in source log directory has been successfully renamed for deletion, + // update checkpoint files and enqueue this log to be deleted. + // Note: We intentionally do NOT close the log here to avoid race conditions where concurrent + // operations (e.g., log flusher, fetch requests) might encounter ClosedChannelException. + // The log will be deleted asynchronously by the background delete-logs thread. + // File handles are intentionally left open; Unix semantics allow the renamed files + // to remain accessible until all handles are closed. val logDir = srcLog.parentDirFile val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b2a2bf3d54e48..378f50f119041 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -43,7 +43,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler} -import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog, LogManager => JLogManager} +import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog} import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.function.Executable From 1776ca43db3eb8670495386e2657e73a7cd7dfc0 Mon Sep 17 00:00:00 2001 From: itoumlilt Date: Tue, 13 Jan 2026 09:57:15 +0100 Subject: [PATCH 3/4] Add flush verification to test race condition fix Verify that flush() can be called without ClosedChannelException after replaceCurrentWithFutureLog, confirming the race condition is resolved. --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 378f50f119041..bb3bc2888d04f 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -1145,6 +1145,10 @@ class LogManagerTest { // Verify the source log was renamed to .delete assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX)) + + // Verify that flush() can be called without error (no ClosedChannelException) + val flushLog: Executable = () => spyCurrentLog.flush(false) + assertDoesNotThrow(flushLog) } @Test From da387d072c8cb85cbd9da4b8bedda5ed22eb2a89 Mon Sep 17 00:00:00 2001 From: itoumlilt Date: Tue, 13 Jan 2026 12:44:24 +0100 Subject: [PATCH 4/4] Mock logEndOffset to trigger actual flush in test The flush only occurs when flushOffset > localLog.recoveryPoint() --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index bb3bc2888d04f..7736f6736e96b 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.{ArgumentCaptor, ArgumentMatchers} -import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify} +import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify, when} import java.io._ import java.lang.{Long => JLong} @@ -1147,6 +1147,8 @@ class LogManagerTest { assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX)) // Verify that flush() can be called without error (no ClosedChannelException) + // Mock logEndOffset > 0 to trigger actual flush (flush only happens when flushOffset > recoveryPoint) + when(spyCurrentLog.logEndOffset()).thenReturn(100L) val flushLog: Executable = () => spyCurrentLog.flush(false) assertDoesNotThrow(flushLog) }