Skip to content

Re-enable centalized schema with no query embedded tests#18561

Merged
kfaraz merged 3 commits intoapache:masterfrom
uds5501:fix-flaky-centralized-schema-embedded-tests
Sep 23, 2025
Merged

Re-enable centalized schema with no query embedded tests#18561
kfaraz merged 3 commits intoapache:masterfrom
uds5501:fix-flaky-centralized-schema-embedded-tests

Conversation

@uds5501
Copy link
Copy Markdown
Contributor

@uds5501 uds5501 commented Sep 22, 2025

As follow-up of #18497 , this PR re-enables the embedded tests in CentralizedSchemaMetadataQueryDisabledTest

  • The earlier failures were because if the coordinator returns metadata in data source information, the segment cache skips the update of the relevant segment ids.
  • To tackle this, we can wait on the skipped segment count metric instead (introduced in this PR).

Weirdly, this is not unblocking CentralizedSchemaPublishFailureTest (the no. of segments skipped and the no. of segments refreshed seems to be changing erratically in tests and is never what we want, will take a look at those tests in separate PR.)

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find, @uds5501 !!

* to be queryable by Brokers.
*/
public void waitForAllSegmentsToBeAvailable(String dataSource, EmbeddedCoordinator coordinator, EmbeddedBroker broker)
public void waitForAllSegmentsToBeAvailable(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't change this method. Please add a new method which waits on the new metric.

final String taskId = IdUtils.getRandomId();
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
boolean useCentralizedSchema = Boolean.parseBoolean((String) cluster.getCommonProperties().getOrDefault("druid.centralizedDatasourceSchema.enabled", "false"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean useCentralizedSchema = Boolean.parseBoolean((String) cluster.getCommonProperties().getOrDefault("druid.centralizedDatasourceSchema.enabled", "false"));
boolean useCentralizedSchema = Boolean.parseBoolean(cluster.getCommonProperties().getProperty("druid.centralizedDatasourceSchema.enabled", "false"));

.addCommonProperty("druid.centralizedDatasourceSchema.backFillEnabled", "true")
.addCommonProperty("druid.centralizedDatasourceSchema.backFillPeriod", "500")
.addCommonProperty("druid.coordinator.segmentMetadata.disableSegmentMetadataQueries", "true")
.addCommonProperty("druid.sql.planner.metadataRefreshPeriod", "PT0.1s")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property is already added to EmbeddedBroker. Do we need to add it here too?

polledDataSourceMetadata.forEach(this::updateDSMetadata);

// Remove segments of the datasource from refresh list for which we received schema from the Coordinator.
final Map<String, Integer> segmentsSkippedPerDatasource = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Map<String, Integer> segmentsSkippedPerDatasource = new HashMap<>();
final Map<String, Integer> datasourceToNumSegmentsSkipped = new HashMap<>();

Comment on lines +239 to +240
emitMetric(Metric.BROKER_SEGMENTS_SKIPPED_REFRESH, count,
new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put each arg in a separate line.

segmentsToRefresh.removeIf(segmentId -> polledDataSourceMetadata.containsKey(segmentId.getDataSource()));

// Emit metrics per datasource
segmentsSkippedPerDatasource.forEach((dataSource, count) -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Lambda can be simplified

polledDataSourceMetadata.forEach(this::updateDSMetadata);

// Remove segments of the datasource from refresh list for which we received schema from the Coordinator.
final Map<String, Integer> segmentsSkippedPerDatasource = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put the entire new logic in a separate method.

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cleanup, @uds5501 ! Left a final comment.

Comment on lines +230 to +239
// Emit metrics per datasource
datasourceToNumSegmentsSkipped.forEach(
(dataSource, count) ->
emitMetric(
Metric.BROKER_SEGMENTS_SKIPPED_REFRESH,
count,
new ServiceMetricEvent.Builder().setDimension(
DruidMetrics.DATASOURCE,
dataSource))
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be put in the new method as well.

}
}

private Map<String, Integer> getDatasourceToSegmentsSkipped(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Map<String, Integer> getDatasourceToSegmentsSkipped(
private void emitMetricForSkippedSegments(

datasourceToNumSegmentsSkipped.merge(segmentId.getDataSource(), 1, Integer::sum);
}
}
return datasourceToNumSegmentsSkipped;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't return anything. Perform the metric emission here itself.

Comment on lines +236 to +238
new ServiceMetricEvent.Builder().setDimension(
DruidMetrics.DATASOURCE,
dataSource))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new ServiceMetricEvent.Builder().setDimension(
DruidMetrics.DATASOURCE,
dataSource))
new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource)
)

@uds5501 uds5501 requested a review from kfaraz September 23, 2025 05:41
@uds5501
Copy link
Copy Markdown
Contributor Author

uds5501 commented Sep 23, 2025

Thanks for your review @kfaraz , I've accommodated the review comments, PTAL

@kfaraz kfaraz merged commit 9f2d682 into apache:master Sep 23, 2025
60 checks passed
@cecemei cecemei added this to the 35.0.0 milestone Oct 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants