Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1247,9 +1247,13 @@ class LogManager(logDirs: Seq[File],
try {
sourceLog.foreach { srcLog =>
srcLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true)
// Now that replica in source log directory has been successfully renamed for deletion.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to not close the log here, can we update the comment too? including explaining why

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 19057e4 , updated the comment to explain the reasoning.

// Close the log, update checkpoint files, and enqueue this log to be deleted.
srcLog.close()
// Now that replica in source log directory has been successfully renamed for deletion,
// update checkpoint files and enqueue this log to be deleted.
// Note: We intentionally do NOT close the log here to avoid race conditions where concurrent
// operations (e.g., log flusher, fetch requests) might encounter ClosedChannelException.
// The log will be deleted asynchronously by the background delete-logs thread.
// File handles are intentionally left open; Unix semantics allow the renamed files
// to remain accessible until all handles are closed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    //The log will be deleted asynchronously by the background delete-logs thread.
    // File handles are intentionally left open; Unix semantics allow the renamed files
    // to remain accessible until all handles are closed.

How about the following?

File handles are intentionally left open; Unix semantics allow the renamed files
to remain accessible until all handles are closed.
The log will be deleted asynchronously by the background delete-logs thread.
File handles are closed and files are deleted after a configured delay log.segment.delete.delay.ms.
At that time, the expectation is that no other concurrent operations need to access
the deleted file handles any more.

val logDir = srcLog.parentDirFile
val logsToCheckpoint = logsInDir(logDir)
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
Expand Down
45 changes: 43 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify}
import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify, when}

import java.io._
import java.lang.{Long => JLong}
Expand All @@ -43,7 +43,7 @@ import java.util.{Collections, Optional, OptionalLong, Properties}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.{FileLock, KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
import org.apache.kafka.storage.internals.log.{CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogMetricNames, LogManager => JLogManager, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.function.Executable
Expand Down Expand Up @@ -1112,6 +1112,47 @@ class LogManagerTest {
assertEquals(2, logManager.directoryIdsSet.size)
}

/**
* Test that replaceCurrentWithFutureLog does not close the source log, preventing race conditions
* where a concurrent read/flush could fail with ClosedChannelException.
*/
@Test
def testReplaceCurrentWithFutureLogDoesNotCloseSourceLog(): Unit = {
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
logManager = createLogManager(Seq(logDir1, logDir2))
logManager.startup(Set.empty)

val topicName = "replace-log"
val tp = new TopicPartition(topicName, 0)
val currentLog = logManager.getOrCreateLog(tp, topicId = Optional.empty)
// Create a future log in a different directory
logManager.maybeUpdatePreferredLogDir(tp, logDir2.getAbsolutePath)
logManager.getOrCreateLog(tp, isFuture = true, topicId = Optional.empty)

// Spy on the source log to verify close() is not called
val spyCurrentLog = spy(currentLog)
// Inject the spy into the map
val field = classOf[LogManager].getDeclaredField("currentLogs")
field.setAccessible(true)
val currentLogs = field.get(logManager).asInstanceOf[ConcurrentHashMap[TopicPartition, UnifiedLog]]
currentLogs.put(tp, spyCurrentLog)

logManager.replaceCurrentWithFutureLog(tp)

// Verify close() was NOT called on the source log
verify(spyCurrentLog, never()).close()

// Verify the source log was renamed to .delete
assertTrue(spyCurrentLog.dir.getName.endsWith(LogFileUtils.DELETE_DIR_SUFFIX))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also verify that in this situation (i.e. after replaceCurrentWithFutureLog is invoked without channel closed), the logSegment.flush() can be invoked without error?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks! Added in a1f4547 .


// Verify that flush() can be called without error (no ClosedChannelException)
// Mock logEndOffset > 0 to trigger actual flush (flush only happens when flushOffset > recoveryPoint)
when(spyCurrentLog.logEndOffset()).thenReturn(100L)
val flushLog: Executable = () => spyCurrentLog.flush(false)
assertDoesNotThrow(flushLog)
}

@Test
def testCheckpointLogStartOffsetForRemoteTopic(): Unit = {
logManager.shutdown()
Expand Down