diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d38daf8a24109..decff4d86b089 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1702,11 +1702,10 @@ class Log(@volatile private var _dir: File, * (if there is one) and returns true iff it is deletable * @return The number of segments deleted */ - private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { + private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean) = { lock synchronized { val deletable = deletableSegments(predicate) if (deletable.nonEmpty) { - info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason") deleteSegments(deletable) } else 0 } @@ -1784,8 +1783,26 @@ class Log(@volatile private var _dir: File, private def deleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs < 0) return 0 val startMs = time.milliseconds - deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, - reason = s"retention time ${config.retentionMs}ms breach") + + def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { + if (startMs - segment.largestTimestamp > config.retentionMs) { + segment.largestRecordTimestamp match { + case Some(ts) => + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retention time ${config.retentionMs}ms breach based on the largest record timestamp from the" + + s" segment, which is $ts") + case None => + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retention time ${config.retentionMs}ms breach based on the last modified timestamp from the" + + s" segment, which is ${segment.lastModified}") + } + true + } else { + false + } + } + + deleteOldSegments(shouldDelete) } private def deleteRetentionSizeBreachedSegments(): Int = { @@ -1794,20 +1811,30 @@ class Log(@volatile private var _dir: File, def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (diff - segment.size >= 0) { diff -= segment.size + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retention size ${config.retentionSize} bytes breach. Segment size is" + + s" ${segment.size} and total log size after deletion will be ${size - diff}") true } else { false } } - deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach") + deleteOldSegments(shouldDelete) } private def deleteLogStartOffsetBreachedSegments(): Int = { - def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = - nextSegmentOpt.exists(_.baseOffset <= logStartOffset) + def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { + if (nextSegmentOpt.exists(_.baseOffset <= logStartOffset)) { + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" startOffset breach. logStartOffset is $logStartOffset") + true + } else { + false + } + } - deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") + deleteOldSegments(shouldDelete) } def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 61a6e59240a9e..ce5d48c3230f7 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -633,6 +633,11 @@ class LogSegment private[log] (val log: FileRecords, */ def lastModified = log.file.lastModified + /** + * The largest timestamp this segment contains, if maxTimestampSoFar >= 0, otherwise None. + */ + def largestRecordTimestamp: Option[Long] = if (maxTimestampSoFar >= 0) Some(maxTimestampSoFar) else None + /** * The largest timestamp this segment contains. */