From 576f5ade7218cc7f3b3ec8557aae3a17a7b35597 Mon Sep 17 00:00:00 2001 From: Sanjana Kaundinya Date: Wed, 10 Jun 2020 20:35:12 -0700 Subject: [PATCH 1/6] KAFKA-10141: Add more detail to log segment delete messages --- core/src/main/scala/kafka/log/Log.scala | 27 +++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d38daf8a24109..f29192e44344b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1784,8 +1784,18 @@ class Log(@volatile private var _dir: File, private def deleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs < 0) return 0 val startMs = time.milliseconds - deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, - reason = s"retention time ${config.retentionMs}ms breach") + + def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { + if (startMs - segment.largestTimestamp > config.retentionMs) { + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retentionMs breach. Largest timestamp of segment is ${segment.largestTimestamp}") + true + } else { + false + } + } + + deleteOldSegments(shouldDelete, reason = s"retention time ${config.retentionMs}ms breach") } private def deleteRetentionSizeBreachedSegments(): Int = { @@ -1794,6 +1804,8 @@ class Log(@volatile private var _dir: File, def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (diff - segment.size >= 0) { diff -= segment.size + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retentionSize breach. Segment size is ${segment.size}") true } else { false @@ -1804,8 +1816,15 @@ class Log(@volatile private var _dir: File, } private def deleteLogStartOffsetBreachedSegments(): Int = { - def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = - nextSegmentOpt.exists(_.baseOffset <= logStartOffset) + def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { + if (nextSegmentOpt.exists(_.baseOffset <= logStartOffset)) { + info (s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" startOffset breach. logStartOffset is ${logStartOffset}") + true + } else { + false + } + } deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") } From 79f20319a320b253ee6c1991b01a697efb8f18c5 Mon Sep 17 00:00:00 2001 From: Sanjana Kaundinya Date: Thu, 11 Jun 2020 00:02:36 -0700 Subject: [PATCH 2/6] Addressed PR comments --- core/src/main/scala/kafka/log/Log.scala | 23 ++++++++++++------- .../src/main/scala/kafka/log/LogSegment.scala | 5 ++++ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f29192e44344b..910e5eee60357 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1702,11 +1702,12 @@ class Log(@volatile private var _dir: File, * (if there is one) and returns true iff it is deletable * @return The number of segments deleted */ - private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { + private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean) = { lock synchronized { val deletable = deletableSegments(predicate) if (deletable.nonEmpty) { - info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason") + info(s"Found deletable segments with base offsets" + + s" [${deletable.map(_.baseOffset).mkString(",")}]. Deleting ${deletable.size} segments.") deleteSegments(deletable) } else 0 } @@ -1787,15 +1788,21 @@ class Log(@volatile private var _dir: File, def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (startMs - segment.largestTimestamp > config.retentionMs) { - info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retentionMs breach. Largest timestamp of segment is ${segment.largestTimestamp}") + segment.largestRecordTimestamp match { + case Some(ts) => + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retentionMs breach. Largest record timestamp of segment is $ts") + case None => + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + s" retentionMs breach. Last modified timestamp of segment is ${segment.lastModified}") + } true } else { false } } - deleteOldSegments(shouldDelete, reason = s"retention time ${config.retentionMs}ms breach") + deleteOldSegments(shouldDelete) } private def deleteRetentionSizeBreachedSegments(): Int = { @@ -1812,13 +1819,13 @@ class Log(@volatile private var _dir: File, } } - deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach") + deleteOldSegments(shouldDelete) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (nextSegmentOpt.exists(_.baseOffset <= logStartOffset)) { - info (s"Segment with base offset ${segment.baseOffset} will be deleted due to" + + info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + s" startOffset breach. logStartOffset is ${logStartOffset}") true } else { @@ -1826,7 +1833,7 @@ class Log(@volatile private var _dir: File, } } - deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") + deleteOldSegments(shouldDelete) } def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 61a6e59240a9e..ce5d48c3230f7 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -633,6 +633,11 @@ class LogSegment private[log] (val log: FileRecords, */ def lastModified = log.file.lastModified + /** + * The largest timestamp this segment contains, if maxTimestampSoFar >= 0, otherwise None. + */ + def largestRecordTimestamp: Option[Long] = if (maxTimestampSoFar >= 0) Some(maxTimestampSoFar) else None + /** * The largest timestamp this segment contains. */ From 05b176862afc410f2a482a4df6e1aca216bd7c28 Mon Sep 17 00:00:00 2001 From: Sanjana Kaundinya Date: Thu, 11 Jun 2020 18:20:17 -0700 Subject: [PATCH 3/6] Addressed PR comments --- core/src/main/scala/kafka/log/Log.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 910e5eee60357..6e8af67aeb5aa 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1791,10 +1791,12 @@ class Log(@volatile private var _dir: File, segment.largestRecordTimestamp match { case Some(ts) => info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retentionMs breach. Largest record timestamp of segment is $ts") + s" retentionMs breach based on the largest record timestamp from the segment, which" + + s" is $ts") case None => info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retentionMs breach. Last modified timestamp of segment is ${segment.lastModified}") + s" retentionMs breach based on the last modified timestamp from the segment, which" + + s" is ${segment.lastModified}") } true } else { @@ -1812,7 +1814,8 @@ class Log(@volatile private var _dir: File, if (diff - segment.size >= 0) { diff -= segment.size info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retentionSize breach. Segment size is ${segment.size}") + s" retentionSize breach. Segment size is ${segment.size} and total log size after" + + s" deletion will be ${size - diff}") true } else { false From 5ae7d6759adb555bd5c768b0923f7f4b7445e8f1 Mon Sep 17 00:00:00 2001 From: Sanjana Kaundinya Date: Wed, 17 Jun 2020 11:56:25 -0700 Subject: [PATCH 4/6] addressed PR comments --- core/src/main/scala/kafka/log/Log.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6e8af67aeb5aa..eaa14e6f88bc2 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1706,8 +1706,6 @@ class Log(@volatile private var _dir: File, lock synchronized { val deletable = deletableSegments(predicate) if (deletable.nonEmpty) { - info(s"Found deletable segments with base offsets" + - s" [${deletable.map(_.baseOffset).mkString(",")}]. Deleting ${deletable.size} segments.") deleteSegments(deletable) } else 0 } @@ -1791,12 +1789,12 @@ class Log(@volatile private var _dir: File, segment.largestRecordTimestamp match { case Some(ts) => info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retentionMs breach based on the largest record timestamp from the segment, which" + - s" is $ts") + s" ${config.retentionMs} ms breach based on the largest record timestamp from the" + + s" segment, which is $ts") case None => info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retentionMs breach based on the last modified timestamp from the segment, which" + - s" is ${segment.lastModified}") + s" ${config.retentionMs} ms breach based on the last modified timestamp from the" + + s" segment, which is ${segment.lastModified}") } true } else { @@ -1814,8 +1812,8 @@ class Log(@volatile private var _dir: File, if (diff - segment.size >= 0) { diff -= segment.size info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retentionSize breach. Segment size is ${segment.size} and total log size after" + - s" deletion will be ${size - diff}") + s" ${config.retentionSize} breach. Segment size is ${segment.size} and total log size" + + s" after deletion will be ${size - diff}") true } else { false @@ -1829,7 +1827,7 @@ class Log(@volatile private var _dir: File, def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (nextSegmentOpt.exists(_.baseOffset <= logStartOffset)) { info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" startOffset breach. logStartOffset is ${logStartOffset}") + s" startOffset breach. logStartOffset is $logStartOffset") true } else { false From 44fcc4b1cd3f269d193f8bceb7d89a0a175c4644 Mon Sep 17 00:00:00 2001 From: Sanjana Kaundinya Date: Wed, 17 Jun 2020 23:03:51 -0700 Subject: [PATCH 5/6] addressed PR comments --- core/src/main/scala/kafka/log/Log.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index eaa14e6f88bc2..385677ba10eb5 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1812,8 +1812,8 @@ class Log(@volatile private var _dir: File, if (diff - segment.size >= 0) { diff -= segment.size info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" ${config.retentionSize} breach. Segment size is ${segment.size} and total log size" + - s" after deletion will be ${size - diff}") + s" retention size in bytes ${config.retentionSize} breach. Segment size is" + + s" ${segment.size} and total log size after deletion will be ${size - diff}") true } else { false From 068a9061f31a2536e43cb4f650b958342c0a9ae3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 18 Jun 2020 17:21:56 -0700 Subject: [PATCH 6/6] Clarify retention time in comment --- core/src/main/scala/kafka/log/Log.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 385677ba10eb5..decff4d86b089 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1789,11 +1789,11 @@ class Log(@volatile private var _dir: File, segment.largestRecordTimestamp match { case Some(ts) => info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" ${config.retentionMs} ms breach based on the largest record timestamp from the" + + s" retention time ${config.retentionMs}ms breach based on the largest record timestamp from the" + s" segment, which is $ts") case None => info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" ${config.retentionMs} ms breach based on the last modified timestamp from the" + + s" retention time ${config.retentionMs}ms breach based on the last modified timestamp from the" + s" segment, which is ${segment.lastModified}") } true @@ -1812,7 +1812,7 @@ class Log(@volatile private var _dir: File, if (diff - segment.size >= 0) { diff -= segment.size info(s"Segment with base offset ${segment.baseOffset} will be deleted due to" + - s" retention size in bytes ${config.retentionSize} breach. Segment size is" + + s" retention size ${config.retentionSize} bytes breach. Segment size is" + s" ${segment.size} and total log size after deletion will be ${size - diff}") true } else {