Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,6 @@ class Log(@volatile private var _dir: File,
partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)
}

/** Only used for ZK clusters when we update and start using topic IDs on existing topics */
def assignTopicId(topicId: Uuid): Unit = {
if (keepPartitionMetadataFile) {
partitionMetadataFile.write(topicId)
this.topicId = topicId
}
}

private def initializeLeaderEpochCache(): Unit = lock synchronized {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)

Expand All @@ -609,6 +601,29 @@ class Log(@volatile private var _dir: File,
}
}

private def maybeFlushMetadataFile(): Unit = {
partitionMetadataFile.maybeFlush()
}

/** Only used for ZK clusters when we update and start using topic IDs on existing topics */
def assignTopicId(topicId: Uuid): Unit = {
if (!this.topicId.equals(Uuid.ZERO_UUID)) {
if (!this.topicId.equals(topicId)) {
// we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID ${this.topicId}")
}
}

if (keepPartitionMetadataFile) {
this.topicId = topicId
if (!partitionMetadataFile.exists()) {
partitionMetadataFile.record(topicId)
scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
}
}
}

/**
* Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped
* in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than
Expand Down Expand Up @@ -1056,6 +1071,7 @@ class Log(@volatile private var _dir: File,
def close(): Unit = {
debug("Closing log")
lock synchronized {
maybeFlushMetadataFile()
checkIfMemoryMappedBufferClosed()
producerExpireCheck.cancel(true)
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
Expand All @@ -1076,6 +1092,8 @@ class Log(@volatile private var _dir: File,
def renameDir(name: String): Unit = {
lock synchronized {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
// Flush partitionMetadata file before initializing again
maybeFlushMetadataFile()
val renamedDir = new File(dir.getParent, name)
Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
if (renamedDir != dir) {
Expand Down Expand Up @@ -1160,6 +1178,9 @@ class Log(@volatile private var _dir: File,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
ignoreRecordSize: Boolean): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
maybeFlushMetadataFile()

val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)

Expand Down
61 changes: 41 additions & 20 deletions core/src/main/scala/kafka/server/PartitionMetadataFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.regex.Pattern

import kafka.utils.Logging
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException}
import org.apache.kafka.common.utils.Utils


Expand Down Expand Up @@ -90,27 +90,48 @@ class PartitionMetadataFile(val file: File,
private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
private val logDir = file.getParentFile.getParent
@volatile private var dirtyTopicIdOpt : Option[Uuid] = None

/**
* Records the topic ID that will be flushed to disk.
*/
def record(topicId: Uuid): Unit = {
// Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
dirtyTopicIdOpt.foreach { dirtyTopicId =>
if (dirtyTopicId != topicId)
throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " +
s"but had already recorded $dirtyTopicId")
}
dirtyTopicIdOpt = Some(topicId)
}

def write(topicId: Uuid): Unit = {
lock synchronized {
try {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId)))
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
}
def maybeFlush(): Unit = {
// We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
dirtyTopicIdOpt.foreach { _ =>
// We synchronize on the actual write to disk
lock synchronized {
dirtyTopicIdOpt.foreach { topicId =>
try {
// write to temp file and then swap with the existing file
val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
}

Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
dirtyTopicIdOpt = None
}
}
}
}
Expand Down
55 changes: 53 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2547,7 +2547,7 @@ class LogTest {
var log = createLog(logDir, logConfig)

val topicId = Uuid.randomUuid()
log.partitionMetadataFile.write(topicId)
log.assignTopicId(topicId)
log.close()

// test recovery case
Expand All @@ -2556,6 +2556,37 @@ class LogTest {
log.close()
}

def testLogFlushesPartitionMetadataOnAppend(): Unit = {
val logConfig = LogTest.createLogConfig()
val log = createLog(logDir, logConfig)
val record = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("simpleValue".getBytes))

val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)

// Should trigger a synchronous flush
log.appendAsLeader(record, leaderEpoch = 0)
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}

@Test
def testLogFlushesPartitionMetadataOnClose(): Unit = {
val logConfig = LogTest.createLogConfig()
var log = createLog(logDir, logConfig)

val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)

// Should trigger a synchronous flush
log.close()

// We open the log again, and the partition metadata file should exist with the same ID.
log = createLog(logDir, logConfig)
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}

/**
* Test building the time index on the follower by setting assignOffsets to false.
*/
Expand Down Expand Up @@ -3117,7 +3148,7 @@ class LogTest {
// Write a topic ID to the partition metadata file to ensure it is transferred correctly.
val id = Uuid.randomUuid()
log.topicId = id
log.partitionMetadataFile.write(id)
log.assignTopicId(id)

log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
Expand All @@ -3135,6 +3166,26 @@ class LogTest {
assertEquals(id, log.partitionMetadataFile.read().topicId)
}

@Test
def testTopicIdFlushesBeforeDirectoryRename(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)

// Write a topic ID to the partition metadata file to ensure it is transferred correctly.
val topicId = Uuid.randomUuid()
log.partitionMetadataFile.record(topicId)

// Ensure that after a directory rename, the partition metadata file is written to the right location.
val tp = Log.parseTopicPartitionName(log.dir)
log.renameDir(Log.logDeleteDirName(tp))
assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())

// Check the file holds the correct contents.
assertTrue(log.partitionMetadataFile.exists())
assertEquals(topicId, log.partitionMetadataFile.read().topicId)
}

@Test
def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
Expand Down
Loading