diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 1a15a5314f88..690738cbc9ea 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -462,6 +462,8 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon |`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`|| |`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`|| |`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|| +|`segment/schemaCache/rowSignature/changed`|Emitted when the cached row signature on the Broker's segment metadata cache for a datasource changes, indicating schema evolution or some form of flapping.|`dataSource`|| +|`segment/schemaCache/rowSignature/column/count`|Number of columns in the row signature on the Broker's segment metadata cache for a datasource when it's initialized or updated.|`dataSource`|| |`segment/schemaCache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments in the cluster.| |`segment/schemaCache/used/count`|Number of published used segments for which schema is cached.||Depends on the number of segments in the cluster.| |`segment/schemaCache/usedFingerprint/count`|Number of unique schema fingerprints cached for published used segments.||Depends on the number of distinct schema in the cluster.| diff --git a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java index a29f4a7eafff..96e53a3a98ab 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java @@ -53,6 +53,8 @@ public class Metric public static final String REFRESH_SKIPPED_TOMBSTONES = PREFIX + "refresh/tombstone/count"; public static final String REFRESH_DURATION_MILLIS = PREFIX + "refresh/time"; public static final String DATASOURCE_REMOVED = PREFIX + "dataSource/removed"; + public static final String SCHEMA_ROW_SIGNATURE_CHANGED = PREFIX + "rowSignature/changed"; + public static final String SCHEMA_ROW_SIGNATURE_COLUMN_COUNT = PREFIX + "rowSignature/column/count"; /** * Number of used cold segments in the metadata store. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index 084671d25e4d..26a6c63660d4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -358,10 +358,33 @@ private Map queryDataSourceInformation(Set> fetchDataSourceInformation( SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), coordinatorClient, CentralizedDatasourceSchemaConfig.create() @@ -340,7 +340,7 @@ public ListenableFuture> fetchDataSourceInformation( SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), coordinatorClient, CentralizedDatasourceSchemaConfig.create() @@ -373,7 +373,7 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), coordinatorClient, CentralizedDatasourceSchemaConfig.create() @@ -409,7 +409,7 @@ public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled() throws In SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), config @@ -431,6 +431,10 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L); + Assert.assertEquals(0, refreshLatch.getCount()); } @@ -449,7 +453,7 @@ public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled() throws I SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() @@ -726,7 +730,7 @@ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, I SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() @@ -825,6 +829,8 @@ public void refresh( Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); + Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); dataSources.remove("foo2"); @@ -843,6 +849,9 @@ public void refresh( fooDs = schema.getDatasource("foo"); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_CHANGED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); + emitter.verifySum(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 13L); + // check if the new column present in the added segment is present in the datasource schema // ensuring that the schema is rebuilt Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals)); @@ -987,6 +996,11 @@ public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throw refreshLatch = new CountDownLatch(1); Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L); + + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); @@ -1043,7 +1057,7 @@ public void testRunSegmentMetadataQueryWithContext() throws Exception SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), internalQueryConfig, - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() @@ -1108,7 +1122,6 @@ public void testRefreshShouldEmitMetrics() throws InterruptedException, IOExcept { String dataSource = "xyz"; CountDownLatch addSegmentLatch = new CountDownLatch(2); - StubServiceEmitter emitter = new StubServiceEmitter("broker", "host"); BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, @@ -1204,7 +1217,7 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), internalQueryConfig, - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create()