diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 7b56196f717f..55a6dd449b3f 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -441,6 +441,7 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon |`serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| |`metadatacache/init/time`|Time taken to initialize the coordinator segment metadata cache.||Depends on the number of segments.| |`segment/schemaCache/refresh/count`|Number of segments for which schema was refreshed in coordinator segment schema cache.|`dataSource`|| +|`segment/schemaCache/refreshSkipped/count`|Number of segments for which schema refresh was skipped due to presence of segment metadata in datasource polled from coordinator.|`dataSource`|| |`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`|| |`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`|| |`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|| diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java index 777cb1a48041..cbf88f887726 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java @@ -72,7 +72,12 @@ protected String runTask(TaskBuilder taskBuilder) { final String taskId = IdUtils.getRandomId(); cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), overlord); - cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + boolean useCentralizedSchema = Boolean.parseBoolean(cluster.getCommonProperties().getProperty("druid.centralizedDatasourceSchema.enabled", "false")); + if (useCentralizedSchema) { + cluster.callApi().waitForAllSegmentsToBeAvailableWithCentralizedSchema(dataSource, coordinator, broker); + } else { + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + } return taskId; } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java index e0d08108d328..2c701c762138 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java @@ -23,7 +23,6 @@ import org.apache.druid.testing.embedded.compact.CompactionSparseColumnTest; import org.apache.druid.testing.embedded.compact.CompactionTaskTest; import org.apache.druid.testing.embedded.indexing.KafkaDataFormatsTest; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; /** @@ -46,7 +45,6 @@ private static EmbeddedDruidCluster configureCluster(EmbeddedDruidCluster cluste } @Nested - @Disabled("Disabled due to issues with compaction task not publishing schema to broker") public class CompactionSparseColumn extends CompactionSparseColumnTest { @Override @@ -57,7 +55,6 @@ protected EmbeddedDruidCluster createCluster() } @Nested - @Disabled("Disabled due to issues with compaction task not publishing schema to broker") public class CompactionTask extends CompactionTaskTest { @Override @@ -68,7 +65,6 @@ protected EmbeddedDruidCluster createCluster() } @Nested - @Disabled("Disabled due to issues with compaction task not publishing schema to broker") public class KafkaDataFormats extends KafkaDataFormatsTest { @Override diff --git a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java index 08b15f076861..56337a95dc7e 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java @@ -44,6 +44,7 @@ public class Metric // Broker-side metrics public static final String BROKER_POLL_DURATION_MILLIS = PREFIX + "poll/time"; public static final String BROKER_POLL_FAILED = PREFIX + "poll/failed"; + public static final String BROKER_SEGMENTS_SKIPPED_REFRESH = PREFIX + "refreshSkipped/count"; // Schema refresh metrics public static final String STARTUP_DURATION_MILLIS = "metadatacache/init/time"; diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java index 73862b3da0be..0d245772cd19 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java @@ -299,6 +299,29 @@ public void waitForAllSegmentsToBeAvailable(String dataSource, EmbeddedCoordinat ); } + /** + * Waits for all used segments (including overshadowed) of the given datasource + * to be queryable by Brokers when centralized schema is enabled. + */ + public void waitForAllSegmentsToBeAvailableWithCentralizedSchema( + String dataSource, + EmbeddedCoordinator coordinator, + EmbeddedBroker broker + ) + { + final int numSegments = coordinator + .bindings() + .segmentsMetadataStorage() + .retrieveAllUsedSegments(dataSource, Segments.INCLUDING_OVERSHADOWED) + .size(); + + broker.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/schemaCache/refreshSkipped/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(numSegments) + ); + } + /** * Returns a {@link Closeable} that deletes all the data for the given datasource * on {@link Closeable#close()}. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index 71107797783d..084671d25e4d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -222,6 +222,7 @@ public void refresh(final Set segmentsToRefresh, final Set da // update datasource metadata in the cache polledDataSourceMetadata.forEach(this::updateDSMetadata); + emitMetricForSkippedSegments(segmentsToRefresh, polledDataSourceMetadata); // Remove segments of the datasource from refresh list for which we received schema from the Coordinator. segmentsToRefresh.removeIf(segmentId -> polledDataSourceMetadata.containsKey(segmentId.getDataSource())); @@ -277,6 +278,29 @@ public void refresh(final Set segmentsToRefresh, final Set da } } + private void emitMetricForSkippedSegments( + Set segmentsToRefresh, + Map polledDataSourceMetadata + ) + { + final Map datasourceToNumSegmentsSkipped = new HashMap<>(); + + for (SegmentId segmentId : segmentsToRefresh) { + if (polledDataSourceMetadata.containsKey(segmentId.getDataSource())) { + datasourceToNumSegmentsSkipped.merge(segmentId.getDataSource(), 1, Integer::sum); + } + } + + datasourceToNumSegmentsSkipped.forEach( + (dataSource, count) -> + emitMetric( + Metric.BROKER_SEGMENTS_SKIPPED_REFRESH, + count, + new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource) + ) + ); + } + @Override protected void removeSegmentAction(SegmentId segmentId) {