From 00bf7846be8fcf7090b4bf7e86c4ee80a1556b66 Mon Sep 17 00:00:00 2001 From: Chetan Patidar Date: Sat, 1 Mar 2025 15:54:40 +0530 Subject: [PATCH 1/4] Restrict segment metadata kill query till maxInterval from last kill time --- .../coordinator/duty/KillUnusedSegments.java | 10 +++++-- .../duty/KillUnusedSegmentsTest.java | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 4538d39de957..cf516d8b7f9e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -93,7 +93,7 @@ public class KillUnusedSegments implements CoordinatorDuty * Used to keep track of the last interval end time that was killed for each * datasource. */ - private final Map datasourceToLastKillIntervalEnd; + final Map datasourceToLastKillIntervalEnd; private DateTime lastKillTime; @@ -273,9 +273,15 @@ private Interval findIntervalForKill( ) { final DateTime minStartTime = datasourceToLastKillIntervalEnd.get(dataSource); + + // Once the first segment from a datasource is killed, we have a valid minStartTime. + // Restricting the upper bound to scan segments metadata while running the kill task results in a efficient SQL query. final DateTime maxEndTime = ignoreDurationToRetain ? DateTimes.COMPARE_DATE_AS_STRING_MAX - : DateTimes.nowUtc().minus(durationToRetain); + : DateTimes.min(DateTimes.nowUtc().minus(durationToRetain), + minStartTime != null + ? minStartTime.plus(maxIntervalToKill) + : DateTimes.nowUtc().minus(durationToRetain)); final List unusedSegmentIntervals = limitToPeriod( segmentsMetadataManager.getUnusedSegmentIntervals( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 59df67e4b49c..a35a2b4193fa 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -78,6 +78,7 @@ public class KillUnusedSegmentsTest private static final DateTime NOW = DateTimes.nowUtc(); private static final Interval YEAR_OLD = new Interval(Period.days(1), NOW.minusDays(365)); private static final Interval MONTH_OLD = new Interval(Period.days(1), NOW.minusDays(30)); + private static final Interval FIFTEEN_DAY_OLD = new Interval(Period.days(1), NOW.minusDays(15)); private static final Interval DAY_OLD = new Interval(Period.days(1), NOW.minusDays(1)); private static final Interval HOUR_OLD = new Interval(Period.days(1), NOW.minusHours(1)); private static final Interval NEXT_DAY = new Interval(Period.days(1), NOW.plusDays(1)); @@ -87,6 +88,9 @@ public class KillUnusedSegmentsTest private static final String DS2 = "DS2"; private static final String DS3 = "DS3"; + private static final Period RETAIN_DURATION = Period.hours(6); + private static final Period MAX_KILL_INTERVAL = Period.days(20); + private static final RowKey DS1_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS1); private static final RowKey DS2_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS2); private static final RowKey DS3_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS3); @@ -604,6 +608,29 @@ public void testLowerMaxIntervalToKill() validateLastKillStateAndReset(DS1, YEAR_OLD); } + @Test + public void testRestrictKillQueryToMaxInterval() + { + configBuilder.withDurationToRetain(RETAIN_DURATION.toStandardDuration()) + .withMaxIntervalToKill(MAX_KILL_INTERVAL); + + initDuty(); + + createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(29)); + CoordinatorRunStats newDatasourceStats = runDutyAndGetStats(); + + Assert.assertEquals(1, newDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + validateLastKillStateAndReset(DS1, MONTH_OLD); + + createAndAddUnusedSegment(DS1, FIFTEEN_DAY_OLD, VERSION, NOW.minusDays(14)); + createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusHours(2)); + CoordinatorRunStats oldDatasourceStats = runDutyAndGetStats(); + + Assert.assertEquals(2, oldDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + validateLastKillStateAndReset(DS1, FIFTEEN_DAY_OLD); + } + + @Test public void testHigherMaxIntervalToKill() { From 2ee9a8432cf2aaa1e5e97924f87f3ecd5da902b8 Mon Sep 17 00:00:00 2001 From: Chetan Patidar Date: Mon, 3 Mar 2025 11:43:57 +0530 Subject: [PATCH 2/4] fix variables --- .../druid/server/coordinator/duty/KillUnusedSegments.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index cf516d8b7f9e..bdba2a6cc676 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -93,7 +93,7 @@ public class KillUnusedSegments implements CoordinatorDuty * Used to keep track of the last interval end time that was killed for each * datasource. */ - final Map datasourceToLastKillIntervalEnd; + private final Map datasourceToLastKillIntervalEnd; private DateTime lastKillTime; From 3a3a8476052564a08f98defcf9212794a9f1173c Mon Sep 17 00:00:00 2001 From: Chetan Patidar Date: Mon, 3 Mar 2025 18:40:41 +0530 Subject: [PATCH 3/4] fix variables --- .../server/coordinator/duty/KillUnusedSegments.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index bdba2a6cc676..a587b7a5c471 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -276,12 +276,14 @@ private Interval findIntervalForKill( // Once the first segment from a datasource is killed, we have a valid minStartTime. // Restricting the upper bound to scan segments metadata while running the kill task results in a efficient SQL query. + final DateTime maxIntervalFromLastKill = minStartTime != null + ? DateTimes.min(minStartTime.plus(maxIntervalToKill), + DateTimes.nowUtc().minus(durationToRetain)) + : DateTimes.nowUtc().minus(durationToRetain); + final DateTime maxEndTime = ignoreDurationToRetain ? DateTimes.COMPARE_DATE_AS_STRING_MAX - : DateTimes.min(DateTimes.nowUtc().minus(durationToRetain), - minStartTime != null - ? minStartTime.plus(maxIntervalToKill) - : DateTimes.nowUtc().minus(durationToRetain)); + : maxIntervalFromLastKill; final List unusedSegmentIntervals = limitToPeriod( segmentsMetadataManager.getUnusedSegmentIntervals( From 91a8921eecf44b37467e07540d8e2b60d10bf766 Mon Sep 17 00:00:00 2001 From: Chetan Patidar Date: Tue, 4 Mar 2025 09:46:17 +0530 Subject: [PATCH 4/4] addressed comments --- .../coordinator/duty/KillUnusedSegments.java | 20 ++++++----- .../duty/KillUnusedSegmentsTest.java | 35 +++++++++++++++---- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index a587b7a5c471..cd1c17469124 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -276,14 +276,18 @@ private Interval findIntervalForKill( // Once the first segment from a datasource is killed, we have a valid minStartTime. // Restricting the upper bound to scan segments metadata while running the kill task results in a efficient SQL query. - final DateTime maxIntervalFromLastKill = minStartTime != null - ? DateTimes.min(minStartTime.plus(maxIntervalToKill), - DateTimes.nowUtc().minus(durationToRetain)) - : DateTimes.nowUtc().minus(durationToRetain); - - final DateTime maxEndTime = ignoreDurationToRetain - ? DateTimes.COMPARE_DATE_AS_STRING_MAX - : maxIntervalFromLastKill; + final DateTime maxEndTime; + if (ignoreDurationToRetain) { + maxEndTime = DateTimes.COMPARE_DATE_AS_STRING_MAX; + } else if (minStartTime == null) { + maxEndTime = DateTimes.nowUtc().minus(durationToRetain); + } else { + // If we have already killed a segment, limit the kill interval based on the minStartTime + maxEndTime = DateTimes.min( + DateTimes.nowUtc().minus(durationToRetain), + minStartTime.plus(maxIntervalToKill) + ); + } final List unusedSegmentIntervals = limitToPeriod( segmentsMetadataManager.getUnusedSegmentIntervals( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index a35a2b4193fa..272ed1887ce7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -88,9 +88,6 @@ public class KillUnusedSegmentsTest private static final String DS2 = "DS2"; private static final String DS3 = "DS3"; - private static final Period RETAIN_DURATION = Period.hours(6); - private static final Period MAX_KILL_INTERVAL = Period.days(20); - private static final RowKey DS1_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS1); private static final RowKey DS2_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS2); private static final RowKey DS3_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS3); @@ -609,19 +606,22 @@ public void testLowerMaxIntervalToKill() } @Test - public void testRestrictKillQueryToMaxInterval() + public void testMaxIntervalToKillOverridesDurationToRetain() { - configBuilder.withDurationToRetain(RETAIN_DURATION.toStandardDuration()) - .withMaxIntervalToKill(MAX_KILL_INTERVAL); + configBuilder.withDurationToRetain(Period.hours(6).toStandardDuration()) + .withMaxIntervalToKill(Period.days(20)); initDuty(); createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(29)); CoordinatorRunStats newDatasourceStats = runDutyAndGetStats(); + // For a new datasource, the duration to retain is used to determine kill interval Assert.assertEquals(1, newDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); validateLastKillStateAndReset(DS1, MONTH_OLD); + // For a datasource where kill has already happened, maxIntervalToKill is used + // if it leads to a smaller kill interval than durationToRetain createAndAddUnusedSegment(DS1, FIFTEEN_DAY_OLD, VERSION, NOW.minusDays(14)); createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusHours(2)); CoordinatorRunStats oldDatasourceStats = runDutyAndGetStats(); @@ -630,6 +630,29 @@ public void testRestrictKillQueryToMaxInterval() validateLastKillStateAndReset(DS1, FIFTEEN_DAY_OLD); } + @Test + public void testDurationToRetainOverridesMaxIntervalToKill() + { + configBuilder.withDurationToRetain(Period.days(20).toStandardDuration()) + .withMaxIntervalToKill(Period.days(350)); + + initDuty(); + + createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(29)); + CoordinatorRunStats newDatasourceStats = runDutyAndGetStats(); + + Assert.assertEquals(1, newDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + validateLastKillStateAndReset(DS1, YEAR_OLD); + + // For a datasource where (now - durationToRetain) < (lastKillTime(year old segment) + maxInterval) + // Fifteen day old segment will be rejected + createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(29)); + createAndAddUnusedSegment(DS1, FIFTEEN_DAY_OLD, VERSION, NOW.minusDays(14)); + CoordinatorRunStats oldDatasourceStats = runDutyAndGetStats(); + + Assert.assertEquals(2, oldDatasourceStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + validateLastKillStateAndReset(DS1, MONTH_OLD); + } @Test public void testHigherMaxIntervalToKill()