Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 119 additions & 71 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,30 @@
*/
package kafka.raft

import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.ConcurrentSkipListSet
import java.io.File
import java.nio.file.{Files, NoSuchFileException, Path}
import java.util.{Optional, Properties}

import kafka.api.ApiVersion
import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, LogDirFailureChannel}
import kafka.utils.{Logging, Scheduler}
import kafka.utils.{CoreUtils, Logging, Scheduler}
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}

import scala.annotation.nowarn
import scala.collection.mutable
import scala.compat.java8.OptionConverters._

final class KafkaMetadataLog private (
log: Log,
scheduler: Scheduler,
// This object needs to be thread-safe because it is used by the snapshotting thread to notify the
// polling thread when snapshots are created.
snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
// Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
// polling thread when snapshots are created. This object is also used to store any opened snapshot reader.
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the above comment still accurate since snapshots is no longer thread safe?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I updated the comment. I'll push a commit tomorrow after a few other changes.

topicPartition: TopicPartition,
maxFetchSizeInBytes: Int,
val fileDeleteDelayMs: Long // Visible for testing,
Expand Down Expand Up @@ -161,19 +162,24 @@ final class KafkaMetadataLog private (

override def truncateToLatestSnapshot(): Boolean = {
val latestEpoch = log.latestEpoch.getOrElse(0)
latestSnapshotId().asScala match {
case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
(snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) =>
val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {
Copy link
Copy Markdown
Member

@mumrah mumrah May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we grab the snapshots lock for this whole match expression like we do in deleteBeforeSnapshot? Is there possible a race between this block and deleteBeforeSnapshot?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronizing snapshots is only needed when accessing that object. In deleteBeforeSnapshot it is grabbed because the match expression accesses snapshots in one of the case/branch.

In this method I think it is safe to only grab the log where we currently do.

case Some(snapshotId) if (
snapshotId.epoch > latestEpoch ||
(snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)
) =>
// Truncate the log fully if the latest snapshot is greater than the log end offset

log.truncateFullyAndStartAt(snapshotId.offset)
// Delete snapshot after truncating
removeSnapshotFilesBefore(snapshotId)

true

case _ => false
// Forget snapshots less than the log start offset
snapshots synchronized {
(true, forgetSnapshotsBefore(snapshotId))
}
case _ =>
(false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
}

removeSnapshots(forgottenSnapshots)
truncated
}

override def initializeLeaderEpoch(epoch: Int): Unit = {
Expand Down Expand Up @@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
}

override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = {
try {
if (snapshotIds.contains(snapshotId)) {
Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
} else {
Optional.empty()
snapshots synchronized {
val reader = snapshots.get(snapshotId) match {
case None =>
// Snapshot doesn't exists
None
case Some(None) =>
// Snapshot exists but has never been read before
try {
val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
snapshots.put(snapshotId, snapshotReader)
snapshotReader
Comment thread
jsancio marked this conversation as resolved.
} catch {
case _: NoSuchFileException =>
// Snapshot doesn't exists in the data dir; remove
val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
warn(s"Couldn't read $snapshotId; expected to find snapshot file $path")
snapshots.remove(snapshotId)
None
}
case Some(value) =>
// Snapshot exists and it is already open; do nothing
value
}
} catch {
case _: NoSuchFileException =>
Optional.empty()

reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
}
}

override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
val descending = snapshotIds.descendingIterator
if (descending.hasNext) {
Optional.of(descending.next)
} else {
Optional.empty()
snapshots synchronized {
Comment thread
jsancio marked this conversation as resolved.
snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
}
}

override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
val ascendingIterator = snapshotIds.iterator
if (ascendingIterator.hasNext) {
Optional.of(ascendingIterator.next)
} else {
Optional.empty()
snapshots synchronized {
snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
}
}

override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
snapshotIds.add(snapshotId)
snapshots synchronized {
snapshots.put(snapshotId, None)
}
}

override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = {
latestSnapshotId().asScala match {
case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
startOffset < logStartSnapshotId.offset &&
logStartSnapshotId.offset <= snapshotId.offset &&
log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>
log.deleteOldSegments()
val (deleted, forgottenSnapshots) = snapshots synchronized {
latestSnapshotId().asScala match {
case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
startOffset < logStartSnapshotId.offset &&
logStartSnapshotId.offset <= snapshotId.offset &&
log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) =>

// Delete all segments that have a "last offset" less than the log start offset
log.deleteOldSegments()

// Delete snapshot after increasing LogStartOffset
removeSnapshotFilesBefore(logStartSnapshotId)
// Forget snapshots less than the log start offset
(true, forgetSnapshotsBefore(logStartSnapshotId))
case _ =>
(false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]])
}
}

true
removeSnapshots(forgottenSnapshots)
deleted
}

case _ => false
}
/**
* Forget the snapshots earlier than a given snapshot id and return the associated
* snapshot readers.
*
* This method assumes that the lock for `snapshots` is already held.
*/
@nowarn("cat=deprecation") // Needed for TreeMap.until
Comment thread
jsancio marked this conversation as resolved.
private def forgetSnapshotsBefore(
logStartSnapshotId: OffsetAndEpoch
): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
snapshots --= expiredSnapshots.keys
Comment thread
jsancio marked this conversation as resolved.

expiredSnapshots
}

/**
* Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset.
* Rename the given snapshots on the log directory. Asynchronously, close and delete the
* given snapshots after some delay.
*/
private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = {
val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator
while (expiredSnapshotIdsIter.hasNext) {
val snapshotId = expiredSnapshotIdsIter.next()
// If snapshotIds contains a snapshot id, the KafkaRaftClient and Listener can expect that the snapshot exists
// on the file system, so we should first remove snapshotId and then delete snapshot file.
expiredSnapshotIdsIter.remove()

val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
val destination = Snapshots.deleteRename(path, snapshotId)
try {
Utils.atomicMoveWithFallback(path, destination, false)
} catch {
case e: IOException =>
error(s"Error renaming snapshot file: $path to $destination", e)
}
private def removeSnapshots(
expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
): Unit = {
expiredSnapshots.foreach { case (snapshotId, _) =>
Snapshots.markForDelete(log.dir.toPath, snapshotId)
}

if (expiredSnapshots.nonEmpty) {
scheduler.schedule(
"delete-snapshot-file",
() => Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId),
fileDeleteDelayMs)
"delete-snapshot-files",
KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath, expiredSnapshots),
Comment thread
jsancio marked this conversation as resolved.
fileDeleteDelayMs
)
}
}

override def close(): Unit = {
log.close()
snapshots synchronized {
snapshots.values.flatten.foreach(_.close())
snapshots.clear()
}
}
}

Expand Down Expand Up @@ -376,8 +413,8 @@ object KafkaMetadataLog {

private def recoverSnapshots(
log: Log
): ConcurrentSkipListSet[OffsetAndEpoch] = {
val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
// Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
// from logStartOffset
Files
Expand All @@ -397,11 +434,22 @@ object KafkaMetadataLog {
// Delete partial snapshot, deleted snapshot and older snapshot
Files.deleteIfExists(snapshotPath.path)
} else {
snapshotIds.add(snapshotPath.snapshotId)
snapshots.put(snapshotPath.snapshotId, None)
}
}
}
snapshotIds
snapshots
}

private def deleteSnapshotFiles(
logDir: Path,
expiredSnapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]]
): () => Unit = () => {
expiredSnapshots.foreach { case (snapshotId, snapshotReader) =>
snapshotReader.foreach { reader =>
CoreUtils.swallow(reader.close(), this)
}
Snapshots.deleteIfExists(logDir, snapshotId)
}
}
}
4 changes: 1 addition & 3 deletions core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ final class KafkaMetadataLogTest {
snapshot.freeze()
}

TestUtils.resource(log.readSnapshot(snapshotId).get()) { snapshot =>
assertEquals(0, snapshot.sizeInBytes())
}
assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes())
}

@Test
Expand Down
81 changes: 40 additions & 41 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1245,51 +1245,50 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(
);
}

try (RawSnapshotReader snapshot = snapshotOpt.get()) {
long snapshotSize = snapshot.sizeInBytes();
if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshotSize) {
return FetchSnapshotResponse.singleton(
log.topicPartition(),
responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot)
.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code())
);
}

if (partitionSnapshot.position() > Integer.MAX_VALUE) {
throw new IllegalStateException(
String.format(
"Trying to fetch a snapshot with size (%s) and a position (%s) larger than %s",
snapshotSize,
partitionSnapshot.position(),
Integer.MAX_VALUE
)
);
}

int maxSnapshotSize;
try {
maxSnapshotSize = Math.toIntExact(snapshotSize);
} catch (ArithmeticException e) {
maxSnapshotSize = Integer.MAX_VALUE;
}

UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));

RawSnapshotReader snapshot = snapshotOpt.get();
long snapshotSize = snapshot.sizeInBytes();
if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshotSize) {
return FetchSnapshotResponse.singleton(
log.topicPartition(),
responsePartitionSnapshot -> {
addQuorumLeader(responsePartitionSnapshot)
.snapshotId()
.setEndOffset(snapshotId.offset)
.setEpoch(snapshotId.epoch);

return responsePartitionSnapshot
.setSize(snapshotSize)
.setPosition(partitionSnapshot.position())
.setUnalignedRecords(records);
}
responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot)
.setErrorCode(Errors.POSITION_OUT_OF_RANGE.code())
);
}

if (partitionSnapshot.position() > Integer.MAX_VALUE) {
throw new IllegalStateException(
String.format(
"Trying to fetch a snapshot with size (%s) and a position (%s) larger than %s",
snapshotSize,
partitionSnapshot.position(),
Integer.MAX_VALUE
)
);
}

int maxSnapshotSize;
try {
maxSnapshotSize = Math.toIntExact(snapshotSize);
} catch (ArithmeticException e) {
maxSnapshotSize = Integer.MAX_VALUE;
}

UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));

return FetchSnapshotResponse.singleton(
log.topicPartition(),
responsePartitionSnapshot -> {
addQuorumLeader(responsePartitionSnapshot)
.snapshotId()
.setEndOffset(snapshotId.offset)
.setEpoch(snapshotId.epoch);

return responsePartitionSnapshot
.setSize(snapshotSize)
.setPosition(partitionSnapshot.position())
.setUnalignedRecords(records);
}
);
}

private boolean handleFetchSnapshotResponse(
Expand Down
Loading