KAFKA-7557: optimize LogManager.truncateFullyAndStartAt()#5848
KAFKA-7557: optimize LogManager.truncateFullyAndStartAt()#5848junrao merged 8 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
This is a good fix and we should keep it. However, I think @junrao was suggesting something even better. We call checkpointLogRecoveryOffsetsInDir from a number of places where we just want to do it for a single partition. In such cases, we can do deleteSnapshotsAfterRecoveryPointCheckpoint for that partition only.
There was a problem hiding this comment.
A few suggestions:
- Maybe pass a
Option[Seq[Log]]here, remove the default argument and call itaffectedLogs. - In the documentation, we should specify that if a subset of logs are passed, we can optimize the producer snapshot deletion process. The actual checkpointing always involves all the logs.
|
@huxihx : Thanks for the patch. I realized that LogManager.truncateTo() has a similar problem since it calls checkpointLogRecoveryOffsetsInDir on every disk dir. To properly fix this, we probably should change AbstractFetcherThread.maybeTruncate to call truncate on a batch of partitions. In LogManager.truncateTo(), we only call checkpointLogRecoveryOffsetsInDir on the disk dir that has at least one truncated partition and also pass along a set of partitions per disk dir to checkpointLogRecoveryOffsetsInDir to optimize the number of calls to log.deleteSnapshotsAfterRecoveryPoint. Do you think you could address that in the same PR? |
There was a problem hiding this comment.
It seems that log.dir.getAbsolutePath should be log.dir.getParentFile.getAbsolutePath? Could we make this method visible at the package level and add a unit test?
There was a problem hiding this comment.
Could we make offsetTruncationStates non-optional and get rid of tp and offsetTruncationState?
|
@junrao Are we safe to call |
|
@huxihx : That's a good question. We are already calling checkpointLogRecoveryOffsets() in the scheduler in LogManager w/o partition level leaderIsrUpdateLock. The purpose of partition level leaderIsrUpdateLock is to prevent leader changes while the partition is being used. checkpointLogRecoveryOffsets() just writes the current per partition recovery offset to a file, which is independent of partition level leader changes. So, this seems ok. |
|
retest this please |
| // Close the log, update checkpoint files, and enqueue this log to be deleted. | ||
| sourceLog.close() | ||
| checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) | ||
| checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile, Some(Seq(sourceLog))) |
There was a problem hiding this comment.
Since sourceLog will be deleted soon, there is no need to clean the snapshot for this partition. We could just pass in None.
There was a problem hiding this comment.
I think it'd better pass it explicitly since only passing None could also means doing for all logs.
| } | ||
| removedLog.renameDir(Log.logDeleteDirName(topicPartition)) | ||
| checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile) | ||
| checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile, Some(Seq(removedLog))) |
There was a problem hiding this comment.
Similar to the above, since removedLog will be deleted soon, there is no need to clean the snapshot for this partition. We could just pass in None.
There was a problem hiding this comment.
Ditto. I think it'd better pass it explicitly since only passing None could also means doing for all logs. In checkpointLogRecoveryOffsetsInDir, all logs ending with -delete will be excluded.
|
retest this please |
| * @param affectedLogs logs whose snapshots need to be cleaned. If it's None, the snapshot for all logs in the directory will be cleaned | ||
| */ | ||
| // Only for testing | ||
| private[log] def checkpointLogRecoveryOffsetsInDir(dir: File, affectedLogs: Option[Seq[Log]] = None): Unit = { |
There was a problem hiding this comment.
Thinking about this a bit more. I feel that Option[Seq[Log]] is a bit hard to understand. The issue is that we are trying to do 2 separate things--writing the recovery checkpoint file and deleting the snapshot---in a single method. Perhaps it's cleaner to split them into 2 separate methods. I am thinking of the following.
// write the recovery checkpoint in the provided directory
def checkpointLogRecoveryOffsetsInDir(dir: File)
// clean the producer snapshot files in the provided logs
def cleanSnapshot(logs: Seq[Log])
Would that be better?
There was a problem hiding this comment.
That's a good point Jun. My only concern is that we lose the fact that these two things should happen at the same time. Is that ever not true? Another option would be to replace Option[Seq[Long]] with Seq[Log] and then just have the caller always pass the sequence.
There was a problem hiding this comment.
It's mostly true that these two things should happen together. However, in the case when we want to delete a partition (asyncDelete), we just want to checkpoint the recovery offsets w/o the to be deleted partition. There is no need to delete the snapshot since the partition will be deleted.
But I agree, perhaps the better compromise is to have a single method like the following and force the caller to explicitly pass in the logs to clean the snapshots.
def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log])
There was a problem hiding this comment.
I guess it's a good idea to have this method split into two sub routines focusing on their own jobs.
| // Only for testing | ||
| private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = { | ||
| try { | ||
| checkpointLogRecoveryOffsetsInDir(dir) |
There was a problem hiding this comment.
It's probably better to fold the logic in checkpointLogRecoveryOffsetsInDir(dir: File) here and let all callers go through checkpointRecoveryOffsetsAndCleanSnapshot(). Currently, there are still a couple of callers to checkpointLogRecoveryOffsetsInDir(dir: File) directly. The issue is that IOException is not handled properly there as in checkpointRecoveryOffsetsAndCleanSnapshot().
ijuma
left a comment
There was a problem hiding this comment.
One thing I noticed:
private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
(this.currentLogs.toList ++ this.futureLogs.toList).groupBy {
case (_, log) => log.dir.getParent
}.mapValues(_.toMap)We should replace mapValues with map. The reason is that mapValues is lazy and it should generally only be used for cheap operations to ensure the performance model is understandable. With the current implementation, everytime someone does a get it triggers a toMap call.
Instead of calling deleteSnapshotsAfterRecoveryPointCheckpoint for allLogs, invoking it only for the logs being truncated. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Instead of calling
deleteSnapshotsAfterRecoveryPointCheckpointforallLogs, a possible optimization could be invoking it only for the logs being truncated.More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)