Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ public int getTotalSegments()
@VisibleForTesting
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Skip adding tombstone segment to the cache. These segments lack data or column information.
// Additionally, segment metadata queries, which are not yet implemented for tombstone segments
// (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
// leading to indefinite refresh attempts for these segments.
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
Expand Down Expand Up @@ -530,6 +537,10 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
@VisibleForTesting
public void removeSegment(final DataSegment segment)
{
// tombstone segments are not present in the cache
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone.", segment.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -2214,6 +2215,77 @@ protected void coldDatasourceSchemaExec()
Assert.assertEquals(0, latch.getCount());
}

@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This test case is very similar to BSMC#testTombstoneSegmentIsNotAdded(). Can we do a bit of refactor to remove redundancy.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is trying ensure that tombstone segments are not added to the Broker and Coordinator cache instances.
Refactoring the test wouldn't be very beneficial since,

  • Timeline used in the two caches is different.
  • org.apache.druid.segment.metadata.AbstractSegmentMetadataCache#getSegmentMetadataSnapshot implementation differs between the two instances.

{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);

CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};

schema.onLeaderStart();
schema.awaitInitialization();

DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
null,
0
);

Assert.assertEquals(6, schema.getTotalSegments());

serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());

Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());

serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -1136,4 +1137,73 @@ public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedExce

Assert.assertNull(schema.getDatasource("foo"));
}

@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
BrokerSegmentMetadataCacheConfig.create(),
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};

schema.start();
schema.awaitInitialization();

DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
null,
0
);

Assert.assertEquals(6, schema.getTotalSegments());

serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());

Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());

serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
}
}