diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index cbf4a84ba790..9d4ab1edf497 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -273,7 +273,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getDataSegmentKiller().kill(segmentsToBeKilled); numBatchesProcessed++; - numSegmentsKilled += unusedSegments.size(); + numSegmentsKilled += segmentsToBeKilled.size(); LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index e2c433536a2b..d8e7a006605e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; @@ -42,6 +43,7 @@ import org.junit.Test; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -118,6 +120,62 @@ public void testKill() throws Exception Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats()); } + /** + * {@code segment1}, {@code segment2} and {@code segment3} have different versions, but share the same load spec. + * {@code segment1} and {@code segment2} are unused segments, while {@code segment3} is a used segment. + * When a kill task is submitted, the unused segments {@code segment1} and {@code segment2} should be deleted from the + * metadata store, but should be retained in deep storage as the load spec is used by segment {@code segment3}. + */ + @Test + public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception + { + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.minusHours(2).toString(); + final String v3 = now.minusHours(3).toString(); + + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1, ImmutableMap.of("foo", "1")); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v2, ImmutableMap.of("foo", "1")); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v3, ImmutableMap.of("foo", "1")); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3); + final Set unusedSegments = ImmutableSet.of(segment1, segment2); + + Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); + Assert.assertEquals( + unusedSegments.size(), + getSegmentsMetadataManager().markSegmentsAsUnused( + unusedSegments.stream().map(DataSegment::getId).collect(Collectors.toSet()) + ) + ); + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + false, + null, + 100, + null + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(0, 1, 0), + getReportedStats() + ); + + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); + + Assert.assertEquals(ImmutableSet.of(), new HashSet<>(observedUnusedSegments)); + } + @Test public void testKillWithMarkUnused() throws Exception @@ -823,4 +881,19 @@ private static DataSegment newSegment(Interval interval, String version) 10L ); } + + private static DataSegment newSegment(Interval interval, String version, Map loadSpec) + { + return new DataSegment( + DATA_SOURCE, + interval, + version, + loadSpec, + null, + null, + null, + 9, + 10L + ); + } }