diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index 88e6ee97b983..06ce009c8b2b 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -458,6 +458,13 @@ public int getTotalSegments() @VisibleForTesting public void addSegment(final DruidServerMetadata server, final DataSegment segment) { + // Skip adding tombstone segment to the cache. These segments lack data or column information. + // Additionally, segment metadata queries, which are not yet implemented for tombstone segments + // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, + // leading to indefinite refresh attempts for these segments. + if (segment.isTombstone()) { + return; + } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking @@ -530,6 +537,10 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme @VisibleForTesting public void removeSegment(final DataSegment segment) { + // tombstone segments are not present in the cache + if (segment.isTombstone()) { + return; + } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { log.debug("Segment [%s] is gone.", segment.getId()); diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 8fbc78a74128..772a79ae0ad1 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -81,6 +81,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.timeline.partition.TombstoneShardSpec; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -2214,6 +2215,77 @@ protected void coldDatasourceSchemaExec() Assert.assertEquals(0, latch.getCount()); } + @Test + public void testTombstoneSegmentIsNotAdded() throws InterruptedException + { + String datasource = "newSegmentAddTest"; + CountDownLatch addSegmentLatch = new CountDownLatch(1); + + CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( + getQueryLifecycleFactory(walker), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + segmentSchemaCache, + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier + ) + { + @Override + public void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + super.addSegment(server, segment); + if (datasource.equals(segment.getDataSource())) { + addSegmentLatch.countDown(); + } + } + }; + + schema.onLeaderStart(); + schema.awaitInitialization(); + + DataSegment segment = new DataSegment( + datasource, + Intervals.of("2001/2002"), + "1", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + TombstoneShardSpec.INSTANCE, + null, + null, + 0 + ); + + Assert.assertEquals(6, schema.getTotalSegments()); + + serverView.addSegment(segment, ServerType.HISTORICAL); + Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + Assert.assertEquals(0, addSegmentLatch.getCount()); + + Assert.assertEquals(6, schema.getTotalSegments()); + List metadatas = schema + .getSegmentMetadataSnapshot() + .values() + .stream() + .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) + .collect(Collectors.toList()); + Assert.assertEquals(0, metadatas.size()); + + serverView.removeSegment(segment, ServerType.HISTORICAL); + Assert.assertEquals(6, schema.getTotalSegments()); + metadatas = schema + .getSegmentMetadataSnapshot() + .values() + .stream() + .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) + .collect(Collectors.toList()); + Assert.assertEquals(0, metadatas.size()); + } + private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) { final DataSourceInformation fooDs = schema.getDatasource("foo"); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 65610ce99f28..10504ca49d55 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -79,6 +79,7 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.timeline.partition.TombstoneShardSpec; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -1136,4 +1137,73 @@ public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedExce Assert.assertNull(schema.getDatasource("foo")); } + + @Test + public void testTombstoneSegmentIsNotAdded() throws InterruptedException + { + String datasource = "newSegmentAddTest"; + CountDownLatch addSegmentLatch = new CountDownLatch(1); + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + serverView, + BrokerSegmentMetadataCacheConfig.create(), + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), + new NoopCoordinatorClient(), + CentralizedDatasourceSchemaConfig.create() + ) + { + @Override + public void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + super.addSegment(server, segment); + if (datasource.equals(segment.getDataSource())) { + addSegmentLatch.countDown(); + } + } + }; + + schema.start(); + schema.awaitInitialization(); + + DataSegment segment = new DataSegment( + datasource, + Intervals.of("2001/2002"), + "1", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + TombstoneShardSpec.INSTANCE, + null, + null, + 0 + ); + + Assert.assertEquals(6, schema.getTotalSegments()); + + serverView.addSegment(segment, ServerType.HISTORICAL); + Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + Assert.assertEquals(0, addSegmentLatch.getCount()); + + Assert.assertEquals(6, schema.getTotalSegments()); + List metadatas = schema + .getSegmentMetadataSnapshot() + .values() + .stream() + .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) + .collect(Collectors.toList()); + Assert.assertEquals(0, metadatas.size()); + + serverView.removeSegment(segment, ServerType.HISTORICAL); + Assert.assertEquals(6, schema.getTotalSegments()); + metadatas = schema + .getSegmentMetadataSnapshot() + .values() + .stream() + .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) + .collect(Collectors.toList()); + Assert.assertEquals(0, metadatas.size()); + } }