diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 2de9a0f10f2a..1e36cc825a01 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -55,109 +55,94 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase private TestTaskRunner taskRunner; + private DataSegment segment1; + private DataSegment segment2; + private DataSegment segment3; + private DataSegment segment4; + @Before public void setup() { taskRunner = new TestTaskRunner(); + + final String version = DateTimes.nowUtc().toString(); + segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); + segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); + segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); + segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); } @Test public void testKill() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final Set segments = ImmutableSet.of( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-02-01/2019-03-01"), version), - newSegment(Intervals.of("2019-03-01/2019-04-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) - ); + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); - Assert.assertEquals(segments, announced); Assert.assertTrue( getSegmentsMetadataManager().markSegmentAsUnused( - newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId() + segment2.getId() ) ); Assert.assertTrue( getSegmentsMetadataManager().markSegmentAsUnused( - newSegment(Intervals.of("2019-03-01/2019-04-01"), version).getId() + segment3.getId() ) ); - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2019-03-01/2019-04-01"), - null, - null, - false, - null, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2019-03-01/2019-04-01")) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); - final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2019/2020"), - null, - null, - null + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null, + null ); - Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments); - Assertions.assertThat(getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2019/2020"), - Segments.ONLY_VISIBLE) - ).containsExactlyInAnyOrder( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) - ); + Assert.assertEquals(ImmutableList.of(segment2), observedUnusedSegments); + Assertions.assertThat( + getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + Segments.ONLY_VISIBLE + ) + ).containsExactlyInAnyOrder(segment1, segment4); - Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats()); + Assert.assertEquals( + new KillTaskReport.Stats(1, 2, 0), + getReportedStats() + ); } @Test public void testKillWithMarkUnused() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final Set segments = ImmutableSet.of( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-02-01/2019-03-01"), version), - newSegment(Intervals.of("2019-03-01/2019-04-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) - ); + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); - Assert.assertEquals(segments, announced); Assert.assertTrue( getSegmentsMetadataManager().markSegmentAsUnused( - newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId() + segment2.getId() ) ); - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2019-03-01/2019-04-01"), - null, - null, - true, - null, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2019-03-01/2019-04-01")) + .markAsUnused(true) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); - final List unusedSegments = + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, Intervals.of("2019/2020"), @@ -166,16 +151,19 @@ public void testKillWithMarkUnused() throws Exception null ); - Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments); + Assert.assertEquals(ImmutableList.of(segment2), observedUnusedSegments); Assertions.assertThat( - getMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"), Segments.ONLY_VISIBLE) - ).containsExactlyInAnyOrder( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) - ); + getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + Segments.ONLY_VISIBLE + ) + ).containsExactlyInAnyOrder(segment1, segment4); - Assert.assertEquals(new KillTaskReport.Stats(1, 2, 1), getReportedStats()); + Assert.assertEquals( + new KillTaskReport.Stats(1, 2, 1), + getReportedStats() + ); } @Test @@ -186,13 +174,13 @@ public void testKillSegmentsWithVersions() throws Exception final String v2 = now.minusHours(2).toString(); final String v3 = now.minusHours(3).toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); - final DataSegment segment4 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); - final DataSegment segment5 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); + final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2V1 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3V1 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4V2 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5V3 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); - final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4, segment5); + final Set segments = ImmutableSet.of(segment1V1, segment2V1, segment3V1, segment4V2, segment5V3); Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); Assert.assertEquals( @@ -202,17 +190,12 @@ public void testKillSegmentsWithVersions() throws Exception ) ); - final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018/2020"), - ImmutableList.of(v1, v2), - null, - false, - 3, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018/2020")) + .versions(ImmutableList.of(v1, v2)) + .batchSize(3) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); Assert.assertEquals( @@ -220,14 +203,15 @@ public void testKillSegmentsWithVersions() throws Exception getReportedStats() ); - final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2018/2020"), - null, - null - ); + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); - Assert.assertEquals(ImmutableSet.of(segment5), new HashSet<>(observedUnusedSegments)); + Assert.assertEquals(ImmutableSet.of(segment5V3), new HashSet<>(observedUnusedSegments)); } @Test @@ -238,13 +222,13 @@ public void testKillSegmentsWithVersionsAndLimit() throws Exception final String v2 = now.minusHours(2).toString(); final String v3 = now.minusHours(3).toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); - final DataSegment segment4 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); - final DataSegment segment5 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); + final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2V1 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3V1 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4V2 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5V3 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); - final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4, segment5); + final Set segments = ImmutableSet.of(segment1V1, segment2V1, segment3V1, segment4V2, segment5V3); Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); Assert.assertEquals( @@ -254,17 +238,13 @@ public void testKillSegmentsWithVersionsAndLimit() throws Exception ) ); - final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018/2020"), - ImmutableList.of(v1), - null, - false, - 3, - 2, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018/2020")) + .versions(ImmutableList.of(v1)) + .batchSize(3) + .limit(2) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); Assert.assertEquals( @@ -272,14 +252,15 @@ public void testKillSegmentsWithVersionsAndLimit() throws Exception getReportedStats() ); - final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2018/2020"), - null, - null - ); + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); - Assert.assertEquals(ImmutableSet.of(segment3, segment4, segment5), new HashSet<>(observedUnusedSegments)); + Assert.assertEquals(ImmutableSet.of(segment3V1, segment4V2, segment5V3), new HashSet<>(observedUnusedSegments)); } @Test @@ -290,13 +271,13 @@ public void testKillWithNonExistentVersion() throws Exception final String v2 = now.minusHours(2).toString(); final String v3 = now.minusHours(3).toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); - final DataSegment segment4 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); - final DataSegment segment5 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); + final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2V1 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3V1 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4V2 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5V3 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); - final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4, segment5); + final Set segments = ImmutableSet.of(segment1V1, segment2V1, segment3V1, segment4V2, segment5V3); Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); Assert.assertEquals( @@ -306,17 +287,13 @@ public void testKillWithNonExistentVersion() throws Exception ) ); - final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018/2020"), - ImmutableList.of(now.plusDays(100).toString()), - null, - false, - 3, - 2, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018/2020")) + .versions(ImmutableList.of(now.plusDays(100).toString())) + .batchSize(3) + .limit(2) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); Assert.assertEquals( @@ -324,12 +301,13 @@ public void testKillWithNonExistentVersion() throws Exception getReportedStats() ); - final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2018/2020"), - null, - null - ); + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); Assert.assertEquals(segments, new HashSet<>(observedUnusedSegments)); } @@ -348,12 +326,12 @@ public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception final String v2 = now.minusHours(2).toString(); final String v3 = now.minusHours(3).toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1, ImmutableMap.of("foo", "1")); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v2, ImmutableMap.of("foo", "1")); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v3, ImmutableMap.of("foo", "1")); + final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1, ImmutableMap.of("foo", "1")); + final DataSegment segment2V2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v2, ImmutableMap.of("foo", "1")); + final DataSegment segment3V3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v3, ImmutableMap.of("foo", "1")); - final Set segments = ImmutableSet.of(segment1, segment2, segment3); - final Set unusedSegments = ImmutableSet.of(segment1, segment2); + final Set segments = ImmutableSet.of(segment1V1, segment2V2, segment3V3); + final Set unusedSegments = ImmutableSet.of(segment1V1, segment2V2); Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); Assert.assertEquals( @@ -363,17 +341,12 @@ public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception ) ); - final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018/2020"), - ImmutableList.of(v1, v2), - null, - false, - null, - 100, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018/2020")) + .versions(ImmutableList.of(v1, v2)) + .limit(100) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); Assert.assertEquals( @@ -381,12 +354,13 @@ public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception getReportedStats() ); - final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2018/2020"), - null, - null - ); + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); Assert.assertEquals(ImmutableSet.of(), new HashSet<>(observedUnusedSegments)); } @@ -394,35 +368,21 @@ public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception @Test public void testGetInputSourceResources() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2019-03-01/2019-04-01"), - null, - null, - true, - null, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2019-03-01/2019-04-01")) + .markAsUnused(true) + .build(); Assert.assertTrue(task.getInputSourceResources().isEmpty()); } @Test public void testKillBatchSizeOneAndLimit4() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final Set segments = ImmutableSet.of( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-02-01/2019-03-01"), version), - newSegment(Intervals.of("2019-03-01/2019-04-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) - ); + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); Assert.assertEquals(segments, announced); - Assert.assertEquals( segments.size(), getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( @@ -431,24 +391,18 @@ public void testKillBatchSizeOneAndLimit4() throws Exception ) ); - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 1, - 4, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .batchSize(1) + .limit(4) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); // we expect ALL tasks to be deleted - final List unusedSegments = + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, Intervals.of("2019/2020"), @@ -456,8 +410,11 @@ public void testKillBatchSizeOneAndLimit4() throws Exception null ); - Assert.assertEquals(Collections.emptyList(), unusedSegments); - Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats()); + Assert.assertEquals(Collections.emptyList(), observedUnusedSegments); + Assert.assertEquals( + new KillTaskReport.Stats(4, 4, 0), + getReportedStats() + ); } /** @@ -468,12 +425,6 @@ public void testKillBatchSizeOneAndLimit4() throws Exception @Test public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); - final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); - final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); @@ -509,23 +460,16 @@ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); - - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - umbrellaInterval, - null, - null, - false, - 1, - 10, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(umbrellaInterval) + .batchSize(1) + .limit(10) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); - final List unusedSegments = + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, umbrellaInterval, @@ -533,8 +477,11 @@ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() null ); - Assert.assertEquals(ImmutableList.of(), unusedSegments); - Assert.assertEquals(new KillTaskReport.Stats(3, 4, 0), getReportedStats()); + Assert.assertEquals(ImmutableList.of(), observedUnusedSegments); + Assert.assertEquals( + new KillTaskReport.Stats(3, 4, 0), + getReportedStats() + ); } /** @@ -552,12 +499,6 @@ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() @Test public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); - final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); - final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); @@ -579,13 +520,11 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT ) ); - // Capture the last updated time cutoff - final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc(); + final DateTime lastUpdatedTime1 = DateTimes.nowUtc(); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment1.getId().toString(), lastUpdatedTime1); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment4.getId().toString(), lastUpdatedTime1); - // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again - Thread.sleep(1000); - - // now mark the third segment as unused + // Now mark the third segment as unused Assert.assertEquals( 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( @@ -594,8 +533,8 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT ) ); - final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc(); - + final DateTime lastUpdatedTime2 = DateTimes.nowUtc(); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment3.getId().toString(), lastUpdatedTime2); final List segmentIntervals = segments.stream() .map(DataSegment::getInterval) @@ -603,22 +542,17 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); - final KillUnusedSegmentsTask task1 = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - umbrellaInterval, - null, - null, - false, - 1, - 10, - maxUsedStatusLastUpdatedTime1 - ); + final KillUnusedSegmentsTask task1 = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(umbrellaInterval) + .batchSize(1) + .limit(10) + .maxUsedStatusLastUpdatedTime(lastUpdatedTime1) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode()); - final List unusedSegments = + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, umbrellaInterval, @@ -626,25 +560,23 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT null ); - Assert.assertEquals(ImmutableList.of(segment3), unusedSegments); - Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); + Assert.assertEquals(ImmutableList.of(segment3), observedUnusedSegments); + Assert.assertEquals( + new KillTaskReport.Stats(2, 3, 0), + getReportedStats() + ); - final KillUnusedSegmentsTask task2 = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - umbrellaInterval, - null, - null, - false, - 1, - 10, - maxUsedStatusLastUpdatedTime2 - ); + final KillUnusedSegmentsTask task2 = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(umbrellaInterval) + .batchSize(1) + .limit(10) + .maxUsedStatusLastUpdatedTime(lastUpdatedTime2) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); - final List unusedSegments2 = + final List observedUnusedSegments2 = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, umbrellaInterval, @@ -652,8 +584,11 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT null ); - Assert.assertEquals(ImmutableList.of(), unusedSegments2); - Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats()); + Assert.assertEquals(ImmutableList.of(), observedUnusedSegments2); + Assert.assertEquals( + new KillTaskReport.Stats(1, 2, 0), + getReportedStats() + ); } /** @@ -672,12 +607,6 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT @Test public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime2() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); - final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); - final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); @@ -693,10 +622,9 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT ) ); - final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc(); - - // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again - Thread.sleep(1000); + final DateTime lastUpdatedTime1 = DateTimes.nowUtc(); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment1.getId().toString(), lastUpdatedTime1); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment4.getId().toString(), lastUpdatedTime1); Assert.assertEquals( 2, @@ -708,8 +636,9 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT ) ); - final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc(); - + final DateTime lastUpdatedTime2 = DateTimes.nowUtc(); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment2.getId().toString(), lastUpdatedTime2); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment3.getId().toString(), lastUpdatedTime2); final List segmentIntervals = segments.stream() .map(DataSegment::getInterval) @@ -717,57 +646,53 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); - - final KillUnusedSegmentsTask task1 = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - umbrellaInterval, - null, - null, - false, - 1, - 10, - maxUsedStatusLastUpdatedTime1 - ); + final KillUnusedSegmentsTask task1 = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(umbrellaInterval) + .batchSize(1) + .limit(10) + .maxUsedStatusLastUpdatedTime(lastUpdatedTime1) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode()); - final List unusedSegments = + final List observedUnusedSegments1 = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, umbrellaInterval, null, null - ); + ); - Assert.assertEquals(ImmutableList.of(segment2, segment3), unusedSegments); - Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); + Assert.assertEquals(ImmutableList.of(segment2, segment3), observedUnusedSegments1); + Assert.assertEquals( + new KillTaskReport.Stats(2, 3, 0), + getReportedStats() + ); - final KillUnusedSegmentsTask task2 = - new KillUnusedSegmentsTask( - null, + final KillUnusedSegmentsTask task2 = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(umbrellaInterval) + .batchSize(1) + .limit(10) + .maxUsedStatusLastUpdatedTime(lastUpdatedTime2) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); + + final List observedUnusedSegments2 = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, umbrellaInterval, null, - null, - false, - 1, - 10, - maxUsedStatusLastUpdatedTime2 + null ); - Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); - - final List unusedSegments2 = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - umbrellaInterval, - null, - null + Assert.assertEquals(ImmutableList.of(), observedUnusedSegments2); + Assert.assertEquals( + new KillTaskReport.Stats(2, 3, 0), + getReportedStats() ); - - Assert.assertEquals(ImmutableList.of(), unusedSegments2); - Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); } @Test @@ -790,11 +715,10 @@ public void testKillMultipleUnusedSegmentsWithVersionAndDifferentLastUpdatedTime ) ); - // Capture the last updated time cutoff - final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc(); - - // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again - Thread.sleep(1000); + final DateTime lastUpdatedTime1 = DateTimes.nowUtc(); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment1.getId().toString(), lastUpdatedTime1); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment2.getId().toString(), lastUpdatedTime1); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment4.getId().toString(), lastUpdatedTime1); Assert.assertEquals( 2, @@ -802,7 +726,9 @@ public void testKillMultipleUnusedSegmentsWithVersionAndDifferentLastUpdatedTime ImmutableSet.of(segment3.getId(), segment5.getId()) ) ); - final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc(); + + final DateTime lastUpdatedTime2 = DateTimes.nowUtc(); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment4.getId().toString(), lastUpdatedTime2); final List segmentIntervals = segments.stream() .map(DataSegment::getInterval) @@ -810,18 +736,14 @@ public void testKillMultipleUnusedSegmentsWithVersionAndDifferentLastUpdatedTime final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); - final KillUnusedSegmentsTask task1 = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - umbrellaInterval, - ImmutableList.of(version.toString()), - null, - false, - 1, - 10, - maxUsedStatusLastUpdatedTime1 - ); + final KillUnusedSegmentsTask task1 = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(umbrellaInterval) + .versions(ImmutableList.of(version.toString())) + .batchSize(1) + .limit(10) + .maxUsedStatusLastUpdatedTime(lastUpdatedTime1) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode()); Assert.assertEquals( @@ -839,18 +761,14 @@ public void testKillMultipleUnusedSegmentsWithVersionAndDifferentLastUpdatedTime Assert.assertEquals(ImmutableSet.of(segment3, segment4, segment5), new HashSet<>(observedUnusedSegments)); - final KillUnusedSegmentsTask task2 = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - umbrellaInterval, - ImmutableList.of(version.toString()), - null, - false, - 1, - 10, - maxUsedStatusLastUpdatedTime2 - ); + final KillUnusedSegmentsTask task2 = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(umbrellaInterval) + .versions(ImmutableList.of(version.toString())) + .batchSize(1) + .limit(10) + .maxUsedStatusLastUpdatedTime(lastUpdatedTime2) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); Assert.assertEquals( @@ -872,151 +790,99 @@ public void testKillMultipleUnusedSegmentsWithVersionAndDifferentLastUpdatedTime @Test public void testKillBatchSizeThree() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final Set segments = ImmutableSet.of( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-02-01/2019-03-01"), version), - newSegment(Intervals.of("2019-03-01/2019-04-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) - ); + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); - Assert.assertEquals(segments, announced); - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - true, - 3, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .markAsUnused(true) + .batchSize(3) + .build(); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); - // we expect ALL tasks to be deleted + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null + ); - final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2019/2020"), - null, - null + Assert.assertEquals(Collections.emptyList(), observedUnusedSegments); + Assert.assertEquals( + new KillTaskReport.Stats(4, 3, 4), + getReportedStats() ); - - Assert.assertEquals(Collections.emptyList(), unusedSegments); - - Assert.assertEquals(new KillTaskReport.Stats(4, 3, 4), getReportedStats()); } @Test public void testComputeNextBatchSizeDefault() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - null, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .build(); Assert.assertEquals(100, task.computeNextBatchSize(50)); } @Test public void testComputeNextBatchSizeWithBatchSizeLargerThanLimit() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 10, - 5, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .batchSize(10) + .limit(5) + .build(); Assert.assertEquals(5, task.computeNextBatchSize(0)); } @Test public void testComputeNextBatchSizeWithBatchSizeSmallerThanLimit() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 5, - 10, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .batchSize(5) + .limit(10) + .build(); Assert.assertEquals(5, task.computeNextBatchSize(0)); } @Test public void testComputeNextBatchSizeWithRemainingLessThanLimit() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 5, - 10, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .batchSize(5) + .limit(10) + .build(); Assert.assertEquals(3, task.computeNextBatchSize(7)); } @Test public void testGetNumTotalBatchesDefault() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - null, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .build(); Assert.assertNull(task.getNumTotalBatches()); } @Test public void testGetNumTotalBatchesWithBatchSizeLargerThanLimit() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 10, - 5, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .batchSize(10) + .limit(5) + .build(); Assert.assertEquals(1, (int) task.getNumTotalBatches()); } @@ -1026,17 +892,11 @@ public void testInvalidLimit() MatcherAssert.assertThat( Assert.assertThrows( DruidException.class, - () -> new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 10, - 0, - null - ) + () -> new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .limit(0) + .build() ), DruidExceptionMatcher.invalidInput().expectMessageIs( "limit[0] must be a positive integer." @@ -1050,17 +910,11 @@ public void testInvalidBatchSize() MatcherAssert.assertThat( Assert.assertThrows( DruidException.class, - () -> new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 0, - 10, - null - ) + () -> new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .batchSize(0) + .build() ), DruidExceptionMatcher.invalidInput().expectMessageIs( "batchSize[0] must be a positive integer." @@ -1074,17 +928,13 @@ public void testInvalidLimitWithMarkAsUnused() MatcherAssert.assertThat( Assert.assertThrows( DruidException.class, - () -> new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - true, - 10, - 10, - null - ) + () -> new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .markAsUnused(true) + .batchSize(10) + .limit(10) + .build() ), DruidExceptionMatcher.invalidInput().expectMessageIs( "limit[10] cannot be provided when markAsUnused is enabled." @@ -1093,22 +943,17 @@ public void testInvalidLimitWithMarkAsUnused() } @Test - public void testInvalidVersionWithMarkAsUnused() + public void testInvalidVersionsWithMarkAsUnused() { MatcherAssert.assertThat( Assert.assertThrows( DruidException.class, - () -> new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - ImmutableList.of("foo"), - null, - true, - 10, - null, - null - ) + () -> new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .markAsUnused(true) + .versions(ImmutableList.of("foo")) + .build() ), DruidExceptionMatcher.invalidInput().expectMessageIs( "versions[[foo]] cannot be provided when markAsUnused is enabled." @@ -1119,18 +964,13 @@ public void testInvalidVersionWithMarkAsUnused() @Test public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit() { - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - null, - false, - 5, - 10, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018-01-01/2020-01-01")) + .versions(ImmutableList.of("foo")) + .batchSize(5) + .limit(10) + .build(); Assert.assertEquals(2, (int) task.getNumTotalBatches()); } @@ -1152,6 +992,88 @@ public void testKillTaskReportSerde() throws Exception Assert.assertEquals(stats, deserializedKillReport.getPayload()); } + private static class KillUnusedSegmentsTaskBuilder + { + private String id; + private String dataSource; + private Interval interval; + private List versions; + private Map context; + private Boolean markAsUnused; + private Integer batchSize; + private Integer limit; + private DateTime maxUsedStatusLastUpdatedTime; + + public KillUnusedSegmentsTaskBuilder id(String id) + { + this.id = id; + return this; + } + + public KillUnusedSegmentsTaskBuilder dataSource(String dataSource) + { + this.dataSource = dataSource; + return this; + } + + public KillUnusedSegmentsTaskBuilder interval(Interval interval) + { + this.interval = interval; + return this; + } + + public KillUnusedSegmentsTaskBuilder versions(List versions) + { + this.versions = versions; + return this; + } + + public KillUnusedSegmentsTaskBuilder context(Map context) + { + this.context = context; + return this; + } + + public KillUnusedSegmentsTaskBuilder markAsUnused(Boolean markAsUnused) + { + this.markAsUnused = markAsUnused; + return this; + } + + public KillUnusedSegmentsTaskBuilder batchSize(Integer batchSize) + { + this.batchSize = batchSize; + return this; + } + + public KillUnusedSegmentsTaskBuilder limit(Integer limit) + { + this.limit = limit; + return this; + } + + public KillUnusedSegmentsTaskBuilder maxUsedStatusLastUpdatedTime(DateTime maxUsedStatusLastUpdatedTime) + { + this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime; + return this; + } + + public KillUnusedSegmentsTask build() + { + return new KillUnusedSegmentsTask( + id, + dataSource, + interval, + versions, + context, + markAsUnused, + batchSize, + limit, + maxUsedStatusLastUpdatedTime + ); + } + } + private KillTaskReport.Stats getReportedStats() { try { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index bc4a1ddb50db..e63dc9c17e0d 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -422,21 +422,14 @@ public SegmentPublishResult commitSegmentsAndMetadata( @Nullable final DataSourceMetadata endMetadata ) throws IOException { - if (segments.isEmpty()) { - throw new IllegalArgumentException("segment set must not be empty"); - } - - final String dataSource = segments.iterator().next().getDataSource(); - for (DataSegment segment : segments) { - if (!dataSource.equals(segment.getDataSource())) { - throw new IllegalArgumentException("segments must all be from the same dataSource"); - } - } + verifySegmentsToCommit(segments); if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) { throw new IllegalArgumentException("start/end metadata pair must be either null or non-null"); } + final String dataSource = segments.iterator().next().getDataSource(); + // Find which segments are used (i.e. not overshadowed). final Set usedSegments = new HashSet<>(); List> segmentHolders = diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 31035ca316cd..03de72b96fbb 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -383,17 +383,10 @@ private void markAllSegmentsUnused(Set segments, DateTime usedStatu for (final DataSegment segment : segments) { Assert.assertEquals( 1, - (int) derbyConnector.getDBI().withHandle( - handle -> { - String request = StringUtils.format( - "UPDATE %s SET used = false, used_status_last_updated = :used_status_last_updated WHERE id = :id", - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() - ); - return handle.createStatement(request) - .bind("id", segment.getId().toString()) - .bind("used_status_last_updated", usedStatusLastUpdatedTime.toString() - ).execute(); - } + derbyConnectorRule.segments().update( + "UPDATE %s SET used = false, used_status_last_updated = ? WHERE id = ?", + usedStatusLastUpdatedTime.toString(), + segment.getId().toString() ) ); } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java index 1c8f6493e22d..d240b83dd07d 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java @@ -30,13 +30,11 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.sql.SQLException; import java.sql.SQLRecoverableException; @@ -47,7 +45,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -175,29 +172,7 @@ public void testGeIndexOnNoTable() public void testAlterSegmentTableAddLastUsed() { connector.createSegmentTable(); - - // Drop column used_status_last_updated to bring us in line with pre-upgrade state - derbyConnectorRule.getConnector().retryWithHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - final Batch batch = handle.createBatch(); - batch.add( - StringUtils.format( - "ALTER TABLE %1$s DROP COLUMN USED_STATUS_LAST_UPDATED", - derbyConnectorRule.metadataTablesConfigSupplier() - .get() - .getSegmentsTable() - .toUpperCase(Locale.ENGLISH) - ) - ); - batch.execute(); - return null; - } - } - ); + derbyConnectorRule.segments().update("ALTER TABLE %1$s DROP COLUMN USED_STATUS_LAST_UPDATED"); connector.alterSegmentTableAddUsedFlagLastUpdated(); connector.tableHasColumn( diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 7e491325110b..b177b40c5876 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -41,7 +41,6 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NoneShardSpec; import org.hamcrest.MatcherAssert; -import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; @@ -52,7 +51,6 @@ import org.junit.Test; import java.io.IOException; -import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; @@ -401,7 +399,10 @@ public void testGetUnusedSegmentIntervals() throws IOException "2017-10-15T20:19:12.565Z" ); publishUnusedSegments(koalaSegment1); - updateUsedStatusLastUpdated(koalaSegment1, DateTimes.nowUtc().minus(Duration.standardHours(2))); + derbyConnectorRule.segments().updateUsedStatusLastUpdated( + koalaSegment1.getId().toString(), + DateTimes.nowUtc().minus(Duration.standardHours(2)) + ); // Publish an unused segment with used_status_last_updated 2 days ago final DataSegment koalaSegment2 = createSegment( @@ -410,7 +411,10 @@ public void testGetUnusedSegmentIntervals() throws IOException "2017-10-15T20:19:12.565Z" ); publishUnusedSegments(koalaSegment2); - updateUsedStatusLastUpdated(koalaSegment2, DateTimes.nowUtc().minus(Duration.standardDays(2))); + derbyConnectorRule.segments().updateUsedStatusLastUpdated( + koalaSegment2.getId().toString(), + DateTimes.nowUtc().minus(Duration.standardDays(2)) + ); // Publish an unused segment and set used_status_last_updated to null final DataSegment koalaSegment3 = createSegment( @@ -904,54 +908,35 @@ public void testPopulateUsedFlagLastUpdated() throws IOException Assert.assertEquals(0, getCountOfRowsWithLastUsedNull()); } - private void updateSegmentPayload(DataSegment segment, byte[] payload) - { - executeUpdate( - "UPDATE %1$s SET PAYLOAD = ? WHERE ID = ?", - payload, - segment.getId().toString() - ); - } - private int getCountOfRowsWithLastUsedNull() { return derbyConnectorRule.getConnector().retryWithHandle( handle -> handle.select( StringUtils.format( "SELECT ID FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS NULL", - getSegmentsTable() + derbyConnectorRule.segments().getTableName() ) ).size() ); } - private void updateUsedStatusLastUpdated(DataSegment segment, DateTime newValue) + private void updateSegmentPayload(DataSegment segment, byte[] payload) { - executeUpdate( - "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?", - newValue.toString(), + derbyConnectorRule.segments().update( + "UPDATE %1$s SET PAYLOAD = ? WHERE ID = ?", + payload, segment.getId().toString() ); } private void updateUsedStatusLastUpdatedToNull(DataSegment segment) { - executeUpdate( + derbyConnectorRule.segments().update( "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = NULL WHERE ID = ?", segment.getId().toString() ); } - private void executeUpdate(String sqlFormat, Object... args) - { - derbyConnectorRule.getConnector().retryWithHandle( - handle -> handle.update( - StringUtils.format(sqlFormat, getSegmentsTable()), - args - ) - ); - } - /** * Alters the column used_status_last_updated to be nullable. This is used to * test backward compatibility with versions of Druid without this column @@ -959,14 +944,8 @@ private void executeUpdate(String sqlFormat, Object... args) */ private void allowUsedFlagLastUpdatedToBeNullable() { - executeUpdate("ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED NULL"); - } - - private String getSegmentsTable() - { - return derbyConnectorRule.metadataTablesConfigSupplier() - .get() - .getSegmentsTable() - .toUpperCase(Locale.ENGLISH); + derbyConnectorRule.segments().update( + "ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED NULL" + ); } } diff --git a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java index e5460ce402b4..1fa2ef2302c9 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java +++ b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java @@ -23,12 +23,14 @@ import com.google.common.base.Suppliers; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.storage.derby.DerbyConnector; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.rules.ExternalResource; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import java.sql.SQLException; +import java.util.Locale; import java.util.UUID; public class TestDerbyConnector extends DerbyConnector @@ -135,5 +137,57 @@ public Supplier metadataTablesConfigSupplier() { return dbTables; } + + public SegmentsTable segments() + { + return new SegmentsTable(this); + } + } + + /** + * A wrapper class for queries on the segments table. + */ + public static class SegmentsTable + { + private final DerbyConnectorRule rule; + + public SegmentsTable(DerbyConnectorRule rule) + { + this.rule = rule; + } + + /** + * Updates the segments table with the supplied SQL query format and arguments. + * + * @param sqlFormat the SQL query format with %s placeholder for the table name and ? for each query {@code args} + * @param args the arguments to be substituted into the SQL query + * @return the number of rows affected by the update operation + */ + public int update(String sqlFormat, Object... args) + { + return this.rule.getConnector().retryWithHandle( + handle -> handle.update( + StringUtils.format(sqlFormat, getTableName()), + args + ) + ); + } + + public int updateUsedStatusLastUpdated(String segmentId, DateTime lastUpdatedTime) + { + return update( + "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?", + lastUpdatedTime.toString(), + segmentId + ); + } + + public String getTableName() + { + return this.rule.metadataTablesConfigSupplier() + .get() + .getSegmentsTable() + .toUpperCase(Locale.ENGLISH); + } } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 81bc6c8c490a..aa37913734ee 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.SQLMetadataSegmentPublisher; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; @@ -69,7 +68,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; public class KillUnusedSegmentsTest @@ -776,7 +774,7 @@ private void createAndAddUnusedSegment( throw new RuntimeException(e); } sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId())); - updateUsedStatusLastUpdated(segment, lastUpdatedTime); + derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment.getId().toString(), lastUpdatedTime); } private DataSegment createSegment(final String dataSource, final Interval interval, final String version) @@ -925,25 +923,4 @@ void deleteLastKillTaskId(final String dataSource) observedDatasourceToLastKillTaskId.remove(dataSource); } } - - private void updateUsedStatusLastUpdated(DataSegment segment, DateTime lastUpdatedTime) - { - derbyConnectorRule.getConnector().retryWithHandle( - handle -> handle.update( - StringUtils.format( - "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?", getSegmentsTable() - ), - lastUpdatedTime.toString(), - segment.getId().toString() - ) - ); - } - - private String getSegmentsTable() - { - return derbyConnectorRule.metadataTablesConfigSupplier() - .get() - .getSegmentsTable() - .toUpperCase(Locale.ENGLISH); - } }