Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,16 @@ public RowSignature buildDataSourceRowSignature(final String dataSource)
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);

ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());

if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
// mark it for refresh only if it is used
// however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,102 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
Assert.assertEquals(0, metadatas.size());
}

@Test
public void testUnusedSegmentIsNotRefreshed() throws InterruptedException, IOException
{
String dataSource = "xyz";
CountDownLatch latch = new CountDownLatch(1);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
latch.countDown();
}
};

List<DataSegment> segments = ImmutableList.of(
newSegment(dataSource, 1),
newSegment(dataSource, 2),
newSegment(dataSource, 3)
);

final DruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);

Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();

ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));
segmentStatsMap.put(segments.get(1).getId(), new SegmentMetadata(20L, "fp"));
segmentStatsMap.put(segments.get(2).getId(), new SegmentMetadata(20L, "fp"));

ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new ImmutableMap.Builder<>();
schemaPayloadMap.put("fp", new SchemaPayload(RowSignature.builder().add("c1", ColumnType.DOUBLE).build()));
segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

schema.addSegment(historicalServerMetadata, segments.get(0));
schema.addSegment(historicalServerMetadata, segments.get(1));
schema.addSegment(historicalServerMetadata, segments.get(2));

serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
serverView.addSegment(segments.get(1), ServerType.HISTORICAL);
serverView.addSegment(segments.get(2), ServerType.HISTORICAL);

schema.onLeaderStart();
schema.awaitInitialization();

Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));

// make segment3 unused
segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));

segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
segmentMap.put(segments.get(0).getId(), segments.get(0));
segmentMap.put(segments.get(1).getId(), segments.get(1));

ImmutableDruidDataSource druidDataSource =
new ImmutableDruidDataSource(
"xyz",
Collections.emptyMap(),
segmentMap
);

Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(ArgumentMatchers.anyString()))
.thenReturn(druidDataSource);

Set<SegmentId> segmentsToRefresh = segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
segmentsToRefresh.remove(segments.get(1).getId());
segmentsToRefresh.remove(segments.get(2).getId());

schema.refresh(segmentsToRefresh, Sets.newHashSet(dataSource));

Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
Expand Down