Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@
* <p>
* This class has an abstract method {@link #refresh(Set, Set)} which the child class must override
* with the logic to build and cache table schema.
* <p>
* Note on handling tombstone segments:
* These segments lack data or column information.
* Additionally, segment metadata queries, which are not yet implemented for tombstone segments
* (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
* leading to indefinite refresh attempts for these segments.
* Therefore, these segments are never added to the set of segments being refreshed.
*
* @param <T> The type of information associated with the data source, which must extend {@link DataSourceInformation}.
*/
Expand Down Expand Up @@ -478,13 +485,6 @@ public int getTotalSegments()
@VisibleForTesting
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Skip adding tombstone segment to the cache. These segments lack data or column information.
// Additionally, segment metadata queries, which are not yet implemented for tombstone segments
// (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
// leading to indefinite refresh attempts for these segments.
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
Expand All @@ -511,7 +511,11 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
segmentMetadata = AvailableSegmentMetadata
.builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh
.build();
markSegmentAsNeedRefresh(segment.getId());
if (segment.isTombstone()) {
log.debug("Skipping refresh for tombstone segment.");
} else {
markSegmentAsNeedRefresh(segment.getId());
}
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment [%s].", segment.getId());
markSegmentAsMutable(segment.getId());
Expand Down Expand Up @@ -557,10 +561,6 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
@VisibleForTesting
public void removeSegment(final DataSegment segment)
{
// tombstone segments are not present in the cache
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone.", segment.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ public Iterator<AvailableSegmentMetadata> iterateSegmentMetadata()
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId[%s] is absent.", segmentId);
markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
return availableSegmentMetadata;
}
}
Expand All @@ -403,9 +401,7 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
}
return availableSegmentMetadata;
}
Expand Down Expand Up @@ -686,22 +682,14 @@ public RowSignature buildDataSourceRowSignature(final String dataSource)
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();

if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (SegmentId segmentId : segmentsMap.keySet()) {
for (Map.Entry<SegmentId, AvailableSegmentMetadata> entry : segmentsMap.entrySet()) {
SegmentId segmentId = entry.getKey();
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId);
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
} else {
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);

ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());

if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
// mark it for refresh only if it is used
// however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
}
markSegmentForRefreshIfNeeded(entry.getValue().getSegment());
}
}
} else {
Expand Down Expand Up @@ -876,4 +864,32 @@ Optional<RowSignature> mergeOrCreateRowSignature(
return Optional.empty();
}
}

/**
* A segment schema can go missing. To ensure smooth functioning, segment is marked for refresh.
* It need not be refreshed in the following scenarios:
* - Tombstone segment, since they do not have any schema.
* - Unused segment which hasn't been yet removed from the cache.
* Any other scenario needs investigation.
*/
private void markSegmentForRefreshIfNeeded(DataSegment segment)
{
SegmentId id = segment.getId();

log.debug("SchemaMetadata for segmentId [%s] is absent.", id);

if (segment.isTombstone()) {
log.debug("Skipping refresh for tombstone segment [%s].", id);
return;
}

ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource());

if (druidDataSource != null && druidDataSource.getSegment(id) != null) {
markSegmentAsNeedRefresh(id);
} else {
log.debug("Skipping refresh for unused segment [%s].", id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -2220,74 +2221,109 @@ protected void coldDatasourceSchemaExec()
}

@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
public void testTombstoneSegmentIsNotRefreshed() throws IOException
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";

TestHelper.makeJsonMapper();
InternalQueryConfig internalQueryConfig = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class)
),
InternalQueryConfig.class
);

QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);
QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);

CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
factoryMock,
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
internalQueryConfig,
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};
);

schema.onLeaderStart();
schema.awaitInitialization();
Map<String, Object> queryContext = ImmutableMap.of(
QueryContexts.PRIORITY_KEY, 5,
QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
);

DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
DataSegment segment = newSegment("test", 0);
DataSegment tombstone = DataSegment.builder()
.dataSource("test")
.interval(Intervals.of("2012-01-01/2012-01-02"))
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(new TombstoneShardSpec())
.loadSpec(Collections.singletonMap(
"type",
DataSegment.TOMBSTONE_LOADSPEC_TYPE
))
.size(0)
.build();

final DruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);

Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();

schema.addSegment(historicalServerMetadata, segment);
schema.addSegment(historicalServerMetadata, tombstone);
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));

List<SegmentId> segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId());

SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(segment.getDataSource()),
new MultipleSpecificSegmentSpec(
segmentIterable.stream()
.filter(id -> !id.equals(tombstone.getId()))
.map(SegmentId::toDescriptor)
.collect(Collectors.toList())
),
new AllColumnIncluderator(),
false,
queryContext,
EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS),
false,
null,
0
null
);

Assert.assertEquals(6, schema.getTotalSegments());
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
.andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once();

serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());
EasyMock.replay(factoryMock, lifecycleMock);

Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
schema.refresh(Collections.singleton(segment.getId()), Collections.singleton("test"));

serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
// verify that metadata query is not issued for tombstone segment
EasyMock.verify(factoryMock, lifecycleMock);

// Verify that datasource schema building logic doesn't mark the tombstone segment for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));

AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId());
Assert.assertNotNull(availableSegmentMetadata);
// fetching metadata for tombstone segment shouldn't mark it for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));

Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);

Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count());

// iterating over entire metadata doesn't cause tombstone to be marked for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
}

@Test
Expand Down Expand Up @@ -2384,6 +2420,27 @@ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToR

Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));

AvailableSegmentMetadata availableSegmentMetadata =
schema.getAvailableSegmentMetadata(dataSource, segments.get(0).getId());

Assert.assertNotNull(availableSegmentMetadata);
// fetching metadata for unused segment shouldn't mark it for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));

Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);

Assert.assertEquals(
1,
metadatas.stream()
.filter(
metadata ->
metadata.getSegment().getId().equals(segments.get(0).getId())).count()
);

// iterating over entire metadata doesn't cause unsed segment to be marked for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
Expand Down
Loading