Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1702,11 +1702,10 @@ class Log(@volatile private var _dir: File,
* (if there is one) and returns true iff it is deletable
* @return The number of segments deleted
*/
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean) = {
Comment thread
skaundinya15 marked this conversation as resolved.
Outdated
lock synchronized {
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty) {
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
deleteSegments(deletable)
} else 0
}
Expand Down Expand Up @@ -1784,8 +1783,26 @@ class Log(@volatile private var _dir: File,
private def deleteRetentionMsBreachedSegments(): Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
reason = s"retention time ${config.retentionMs}ms breach")

def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
if (startMs - segment.largestTimestamp > config.retentionMs) {
segment.largestRecordTimestamp match {
case Some(ts) =>
info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" +
s" retention time ${config.retentionMs}ms breach based on the largest record timestamp from the" +
s" segment, which is $ts")
case None =>
info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" +
s" retention time ${config.retentionMs}ms breach based on the last modified timestamp from the" +
s" segment, which is ${segment.lastModified}")
}
true
} else {
false
}
}

deleteOldSegments(shouldDelete)
}

private def deleteRetentionSizeBreachedSegments(): Int = {
Expand All @@ -1794,20 +1811,30 @@ class Log(@volatile private var _dir: File,
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
if (diff - segment.size >= 0) {
diff -= segment.size
info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" +
Comment thread
skaundinya15 marked this conversation as resolved.
Outdated
s" retention size ${config.retentionSize} bytes breach. Segment size is" +
s" ${segment.size} and total log size after deletion will be ${size - diff}")
true
} else {
false
}
}

deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
deleteOldSegments(shouldDelete)
}

private def deleteLogStartOffsetBreachedSegments(): Int = {
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
if (nextSegmentOpt.exists(_.baseOffset <= logStartOffset)) {
info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" +
s" startOffset breach. logStartOffset is $logStartOffset")
true
} else {
false
}
}

deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
deleteOldSegments(shouldDelete)
}

def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@ class LogSegment private[log] (val log: FileRecords,
*/
def lastModified = log.file.lastModified

/**
* The largest timestamp this segment contains, if maxTimestampSoFar >= 0, otherwise None.
*/
def largestRecordTimestamp: Option[Long] = if (maxTimestampSoFar >= 0) Some(maxTimestampSoFar) else None

/**
* The largest timestamp this segment contains.
*/
Expand Down