Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.lookup.LookupsState;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
Expand Down Expand Up @@ -190,6 +191,14 @@ private Map<String, Integer> getLoadStatus()
return status;
}

/**
* Warning: This API reads segments from {@link SqlSegmentsMetadataManager} of the Coordinator which
* caches segments in memory and periodically updates them. Hence, there can be a race condition as
* this API implementation compares segments metadata from cache with segments in historicals.
* Particularly, when number of segment changes after the first initial load of the datasource.
* Workaround is to verify the number of segments matches expected from {@link #getSegments(String) getSegments}
* before calling this method (since, that would wait until the cache is updated with expected data)
*/
public boolean areSegmentsLoaded(String dataSource)
{
final Map<String, Integer> status = getLoadStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,15 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
verifyQuery(INDEX_QUERIES_RESOURCE);

submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
forceTriggerAutoCompaction();
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total)
verifySegmentsCount(5);
forceTriggerAutoCompaction(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);

submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
forceTriggerAutoCompaction();
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total)
verifySegmentsCount(6);
forceTriggerAutoCompaction(6);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Expand All @@ -119,11 +117,10 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception
submitCompactionConfig(10000, SKIP_OFFSET_FROM_LATEST);
// New compaction config should overwrites the existing compaction config
submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST);
forceTriggerAutoCompaction();

// Instead of merging segments, the updated config will split segments!
//...compacted into 10 new segments across 2 days. 5 new segments each day (14 total)
verifySegmentsCount(14);
forceTriggerAutoCompaction(14);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(10, 1);

Expand All @@ -144,10 +141,9 @@ public void testAutoCompactionDutyCanDeleteCompactionConfig() throws Exception

submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
deleteCompactionConfig();
forceTriggerAutoCompaction();

// ...should remains unchanged (4 total)
verifySegmentsCount(4);
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);

Expand All @@ -171,35 +167,31 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception

// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 100);
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);

// Set maxCompactionTaskSlots to 0 to prevent any compaction
updateCompactionTaskSlot(0.1, 0);
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);

// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
forceTriggerAutoCompaction();
// One day compacted (1 new segment) and one day remains uncompacted. (5 total)
verifySegmentsCount(5);
forceTriggerAutoCompaction(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312");
// Run compaction again to compact the remaining day
forceTriggerAutoCompaction();
// Remaining day compacted (1 new segment). Now both days compacted (6 total)
verifySegmentsCount(6);
forceTriggerAutoCompaction(6);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Expand Down Expand Up @@ -284,10 +276,11 @@ private void deleteCompactionConfig() throws Exception
Assert.assertNull(foundDataSourceCompactionConfig);
}

private void forceTriggerAutoCompaction() throws Exception
private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception
{
compactionResource.forceTriggerAutoCompaction();
waitForAllTasksToComplete();
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
Expand Down