diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index d918ec5e3f29..99d965ec643e 100644
--- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -102,6 +102,13 @@
*
* This class has an abstract method {@link #refresh(Set, Set)} which the child class must override
* with the logic to build and cache table schema.
+ *
+ * Note on handling tombstone segments:
+ * These segments lack data or column information.
+ * Additionally, segment metadata queries, which are not yet implemented for tombstone segments
+ * (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
+ * leading to indefinite refresh attempts for these segments.
+ * Therefore, these segments are never added to the set of segments being refreshed.
*
* @param The type of information associated with the data source, which must extend {@link DataSourceInformation}.
*/
@@ -478,13 +485,6 @@ public int getTotalSegments()
@VisibleForTesting
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
- // Skip adding tombstone segment to the cache. These segments lack data or column information.
- // Additionally, segment metadata queries, which are not yet implemented for tombstone segments
- // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
- // leading to indefinite refresh attempts for these segments.
- if (segment.isTombstone()) {
- return;
- }
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
@@ -511,7 +511,11 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
segmentMetadata = AvailableSegmentMetadata
.builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh
.build();
- markSegmentAsNeedRefresh(segment.getId());
+ if (segment.isTombstone()) {
+ log.debug("Skipping refresh for tombstone segment.");
+ } else {
+ markSegmentAsNeedRefresh(segment.getId());
+ }
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment [%s].", segment.getId());
markSegmentAsMutable(segment.getId());
@@ -557,10 +561,6 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
@VisibleForTesting
public void removeSegment(final DataSegment segment)
{
- // tombstone segments are not present in the cache
- if (segment.isTombstone()) {
- return;
- }
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone.", segment.getId());
diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index 321c33fa1dbf..24489e60acdc 100644
--- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -374,9 +374,7 @@ public Iterator iterateSegmentMetadata()
.withNumRows(metadata.get().getNumRows())
.build();
} else {
- // mark it for refresh, however, this case shouldn't arise by design
- markSegmentAsNeedRefresh(segmentId);
- log.debug("SchemaMetadata for segmentId[%s] is absent.", segmentId);
+ markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
return availableSegmentMetadata;
}
}
@@ -403,9 +401,7 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S
.withNumRows(metadata.get().getNumRows())
.build();
} else {
- // mark it for refresh, however, this case shouldn't arise by design
- markSegmentAsNeedRefresh(segmentId);
- log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
+ markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
}
return availableSegmentMetadata;
}
@@ -686,22 +682,14 @@ public RowSignature buildDataSourceRowSignature(final String dataSource)
final Map columnTypes = new LinkedHashMap<>();
if (segmentsMap != null && !segmentsMap.isEmpty()) {
- for (SegmentId segmentId : segmentsMap.keySet()) {
+ for (Map.Entry entry : segmentsMap.entrySet()) {
+ SegmentId segmentId = entry.getKey();
Optional optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId);
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
} else {
- log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
-
- ImmutableDruidDataSource druidDataSource =
- sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());
-
- if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
- // mark it for refresh only if it is used
- // however, this case shouldn't arise by design
- markSegmentAsNeedRefresh(segmentId);
- }
+ markSegmentForRefreshIfNeeded(entry.getValue().getSegment());
}
}
} else {
@@ -876,4 +864,32 @@ Optional mergeOrCreateRowSignature(
return Optional.empty();
}
}
+
+ /**
+ * A segment schema can go missing. To ensure smooth functioning, segment is marked for refresh.
+ * It need not be refreshed in the following scenarios:
+ * - Tombstone segment, since they do not have any schema.
+ * - Unused segment which hasn't been yet removed from the cache.
+ * Any other scenario needs investigation.
+ */
+ private void markSegmentForRefreshIfNeeded(DataSegment segment)
+ {
+ SegmentId id = segment.getId();
+
+ log.debug("SchemaMetadata for segmentId [%s] is absent.", id);
+
+ if (segment.isTombstone()) {
+ log.debug("Skipping refresh for tombstone segment [%s].", id);
+ return;
+ }
+
+ ImmutableDruidDataSource druidDataSource =
+ sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource());
+
+ if (druidDataSource != null && druidDataSource.getSegment(id) != null) {
+ markSegmentAsNeedRefresh(id);
+ } else {
+ log.debug("Skipping refresh for unused segment [%s].", id);
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index 0c099cb551cb..22b0890e855e 100644
--- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -32,6 +32,7 @@
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
@@ -2220,74 +2221,109 @@ protected void coldDatasourceSchemaExec()
}
@Test
- public void testTombstoneSegmentIsNotAdded() throws InterruptedException
+ public void testTombstoneSegmentIsNotRefreshed() throws IOException
{
- String datasource = "newSegmentAddTest";
- CountDownLatch addSegmentLatch = new CountDownLatch(1);
+ String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";
+
+ TestHelper.makeJsonMapper();
+ InternalQueryConfig internalQueryConfig = MAPPER.readValue(
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class)
+ ),
+ InternalQueryConfig.class
+ );
+
+ QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);
+ QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
- getQueryLifecycleFactory(walker),
+ factoryMock,
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
- new InternalQueryConfig(),
+ internalQueryConfig,
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
- )
- {
- @Override
- public void addSegment(final DruidServerMetadata server, final DataSegment segment)
- {
- super.addSegment(server, segment);
- if (datasource.equals(segment.getDataSource())) {
- addSegmentLatch.countDown();
- }
- }
- };
+ );
- schema.onLeaderStart();
- schema.awaitInitialization();
+ Map queryContext = ImmutableMap.of(
+ QueryContexts.PRIORITY_KEY, 5,
+ QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
+ );
- DataSegment segment = new DataSegment(
- datasource,
- Intervals.of("2001/2002"),
- "1",
- Collections.emptyMap(),
- Collections.emptyList(),
- Collections.emptyList(),
- TombstoneShardSpec.INSTANCE,
- null,
+ DataSegment segment = newSegment("test", 0);
+ DataSegment tombstone = DataSegment.builder()
+ .dataSource("test")
+ .interval(Intervals.of("2012-01-01/2012-01-02"))
+ .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
+ .shardSpec(new TombstoneShardSpec())
+ .loadSpec(Collections.singletonMap(
+ "type",
+ DataSegment.TOMBSTONE_LOADSPEC_TYPE
+ ))
+ .size(0)
+ .build();
+
+ final DruidServer historicalServer = druidServers.stream()
+ .filter(s -> s.getType().equals(ServerType.HISTORICAL))
+ .findAny()
+ .orElse(null);
+
+ Assert.assertNotNull(historicalServer);
+ final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();
+
+ schema.addSegment(historicalServerMetadata, segment);
+ schema.addSegment(historicalServerMetadata, tombstone);
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+ List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId());
+
+ SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
+ new TableDataSource(segment.getDataSource()),
+ new MultipleSpecificSegmentSpec(
+ segmentIterable.stream()
+ .filter(id -> !id.equals(tombstone.getId()))
+ .map(SegmentId::toDescriptor)
+ .collect(Collectors.toList())
+ ),
+ new AllColumnIncluderator(),
+ false,
+ queryContext,
+ EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS),
+ false,
null,
- 0
+ null
);
- Assert.assertEquals(6, schema.getTotalSegments());
+ EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
+ EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
+ .andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once();
- serverView.addSegment(segment, ServerType.HISTORICAL);
- Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
- Assert.assertEquals(0, addSegmentLatch.getCount());
+ EasyMock.replay(factoryMock, lifecycleMock);
- Assert.assertEquals(6, schema.getTotalSegments());
- List metadatas = schema
- .getSegmentMetadataSnapshot()
- .values()
- .stream()
- .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
- .collect(Collectors.toList());
- Assert.assertEquals(0, metadatas.size());
+ schema.refresh(Collections.singleton(segment.getId()), Collections.singleton("test"));
- serverView.removeSegment(segment, ServerType.HISTORICAL);
- Assert.assertEquals(6, schema.getTotalSegments());
- metadatas = schema
- .getSegmentMetadataSnapshot()
- .values()
- .stream()
- .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
- .collect(Collectors.toList());
- Assert.assertEquals(0, metadatas.size());
+ // verify that metadata query is not issued for tombstone segment
+ EasyMock.verify(factoryMock, lifecycleMock);
+
+ // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+ AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId());
+ Assert.assertNotNull(availableSegmentMetadata);
+ // fetching metadata for tombstone segment shouldn't mark it for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+ Set metadatas = new HashSet<>();
+ schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
+
+ Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count());
+
+ // iterating over entire metadata doesn't cause tombstone to be marked for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
}
@Test
@@ -2384,6 +2420,27 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR
Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
+
+ AvailableSegmentMetadata availableSegmentMetadata =
+ schema.getAvailableSegmentMetadata(dataSource, segments.get(0).getId());
+
+ Assert.assertNotNull(availableSegmentMetadata);
+ // fetching metadata for unused segment shouldn't mark it for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
+
+ Set metadatas = new HashSet<>();
+ schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
+
+ Assert.assertEquals(
+ 1,
+ metadatas.stream()
+ .filter(
+ metadata ->
+ metadata.getSegment().getId().equals(segments.get(0).getId())).count()
+ );
+
+ // iterating over entire metadata doesn't cause unsed segment to be marked for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
}
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index d9b24ed011dc..b613c602f633 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -37,6 +37,7 @@
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -1139,71 +1140,109 @@ public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedExce
}
@Test
- public void testTombstoneSegmentIsNotAdded() throws InterruptedException
+ public void testTombstoneSegmentIsNotRefreshed() throws IOException
{
- String datasource = "newSegmentAddTest";
- CountDownLatch addSegmentLatch = new CountDownLatch(1);
+ String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";
+
+ TestHelper.makeJsonMapper();
+ InternalQueryConfig internalQueryConfig = MAPPER.readValue(
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class)
+ ),
+ InternalQueryConfig.class
+ );
+
+ QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);
+ QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
+
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
- CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+ factoryMock,
serverView,
- BrokerSegmentMetadataCacheConfig.create(),
+ SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
- new InternalQueryConfig(),
+ internalQueryConfig,
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
- )
- {
- @Override
- public void addSegment(final DruidServerMetadata server, final DataSegment segment)
- {
- super.addSegment(server, segment);
- if (datasource.equals(segment.getDataSource())) {
- addSegmentLatch.countDown();
- }
- }
- };
+ );
- schema.start();
- schema.awaitInitialization();
+ Map queryContext = ImmutableMap.of(
+ QueryContexts.PRIORITY_KEY, 5,
+ QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
+ );
- DataSegment segment = new DataSegment(
- datasource,
- Intervals.of("2001/2002"),
- "1",
- Collections.emptyMap(),
- Collections.emptyList(),
- Collections.emptyList(),
- TombstoneShardSpec.INSTANCE,
- null,
+ DataSegment segment = newSegment("test", 0);
+ DataSegment tombstone = DataSegment.builder()
+ .dataSource("test")
+ .interval(Intervals.of("2012-01-01/2012-01-02"))
+ .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
+ .shardSpec(new TombstoneShardSpec())
+ .loadSpec(Collections.singletonMap(
+ "type",
+ DataSegment.TOMBSTONE_LOADSPEC_TYPE
+ ))
+ .size(0)
+ .build();
+
+ final ImmutableDruidServer historicalServer = druidServers.stream()
+ .filter(s -> s.getType().equals(ServerType.HISTORICAL))
+ .findAny()
+ .orElse(null);
+
+ Assert.assertNotNull(historicalServer);
+ final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();
+
+ schema.addSegment(historicalServerMetadata, segment);
+ schema.addSegment(historicalServerMetadata, tombstone);
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+ List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId());
+
+ SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
+ new TableDataSource(segment.getDataSource()),
+ new MultipleSpecificSegmentSpec(
+ segmentIterable.stream()
+ .filter(id -> !id.equals(tombstone.getId()))
+ .map(SegmentId::toDescriptor)
+ .collect(Collectors.toList())
+ ),
+ new AllColumnIncluderator(),
+ false,
+ queryContext,
+ EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
+ false,
null,
- 0
+ null
);
- Assert.assertEquals(6, schema.getTotalSegments());
+ EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
+ EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
+ .andReturn(QueryResponse.withEmptyContext(Sequences.empty()));
- serverView.addSegment(segment, ServerType.HISTORICAL);
- Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
- Assert.assertEquals(0, addSegmentLatch.getCount());
+ EasyMock.replay(factoryMock, lifecycleMock);
- Assert.assertEquals(6, schema.getTotalSegments());
- List metadatas = schema
- .getSegmentMetadataSnapshot()
- .values()
- .stream()
- .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
- .collect(Collectors.toList());
- Assert.assertEquals(0, metadatas.size());
-
- serverView.removeSegment(segment, ServerType.HISTORICAL);
- Assert.assertEquals(6, schema.getTotalSegments());
- metadatas = schema
- .getSegmentMetadataSnapshot()
- .values()
- .stream()
- .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
- .collect(Collectors.toList());
- Assert.assertEquals(0, metadatas.size());
+ Set segmentsToRefresh = new HashSet<>();
+ segmentsToRefresh.add(segment.getId());
+ schema.refresh(segmentsToRefresh, Collections.singleton("test"));
+
+ // verify that metadata is not issued for tombstone segment
+ EasyMock.verify(factoryMock, lifecycleMock);
+
+ // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+ AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId());
+ Assert.assertNotNull(availableSegmentMetadata);
+ // fetching metadata for tombstone segment shouldn't mark it for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
+
+ Set metadatas = new HashSet<>();
+ schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);
+
+ Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count());
+
+ // iterating over entire metadata doesn't cause tombstone to be marked for refresh
+ Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
}
}