diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index 37bd2f30531f..b369ac34b7a9 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.query.lookup.LookupsState; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer; @@ -190,6 +191,14 @@ private Map getLoadStatus() return status; } + /** + * Warning: This API reads segments from {@link SqlSegmentsMetadataManager} of the Coordinator which + * caches segments in memory and periodically updates them. Hence, there can be a race condition as + * this API implementation compares segments metadata from cache with segments in historicals. + * Particularly, when number of segment changes after the first initial load of the datasource. + * Workaround is to verify the number of segments matches expected from {@link #getSegments(String) getSegments} + * before calling this method (since, that would wait until the cache is updated with expected data) + */ public boolean areSegmentsLoaded(String dataSource) { final Map status = getLoadStatus(); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 592c91034daf..6c57e219c1c7 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -87,17 +87,15 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception verifyQuery(INDEX_QUERIES_RESOURCE); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1)); - forceTriggerAutoCompaction(); //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total) - verifySegmentsCount(5); + forceTriggerAutoCompaction(5); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); - forceTriggerAutoCompaction(); //...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total) - verifySegmentsCount(6); + forceTriggerAutoCompaction(6); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); @@ -119,11 +117,10 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception submitCompactionConfig(10000, SKIP_OFFSET_FROM_LATEST); // New compaction config should overwrites the existing compaction config submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST); - forceTriggerAutoCompaction(); // Instead of merging segments, the updated config will split segments! //...compacted into 10 new segments across 2 days. 5 new segments each day (14 total) - verifySegmentsCount(14); + forceTriggerAutoCompaction(14); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(10, 1); @@ -144,10 +141,9 @@ public void testAutoCompactionDutyCanDeleteCompactionConfig() throws Exception submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); deleteCompactionConfig(); - forceTriggerAutoCompaction(); // ...should remains unchanged (4 total) - verifySegmentsCount(4); + forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(0, null); @@ -171,35 +167,31 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception // Set compactionTaskSlotRatio to 0 to prevent any compaction updateCompactionTaskSlot(0, 100); - forceTriggerAutoCompaction(); // ...should remains unchanged (4 total) - verifySegmentsCount(4); + forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(0, null); checkCompactionIntervals(intervalsBeforeCompaction); // Set maxCompactionTaskSlots to 0 to prevent any compaction updateCompactionTaskSlot(0.1, 0); - forceTriggerAutoCompaction(); // ...should remains unchanged (4 total) - verifySegmentsCount(4); + forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(0, null); checkCompactionIntervals(intervalsBeforeCompaction); // Update compaction slots to be 1 updateCompactionTaskSlot(1, 1); - forceTriggerAutoCompaction(); // One day compacted (1 new segment) and one day remains uncompacted. (5 total) - verifySegmentsCount(5); + forceTriggerAutoCompaction(5); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312"); // Run compaction again to compact the remaining day - forceTriggerAutoCompaction(); // Remaining day compacted (1 new segment). Now both days compacted (6 total) - verifySegmentsCount(6); + forceTriggerAutoCompaction(6); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); @@ -284,10 +276,11 @@ private void deleteCompactionConfig() throws Exception Assert.assertNull(foundDataSourceCompactionConfig); } - private void forceTriggerAutoCompaction() throws Exception + private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception { compactionResource.forceTriggerAutoCompaction(); waitForAllTasksToComplete(); + verifySegmentsCount(numExpectedSegmentsAfterCompaction); ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Compaction"