From 2b308a344c23b19853032ebad40328ac0f379033 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Thu, 29 Jan 2026 13:29:56 -0800 Subject: [PATCH 1/4] Add new metrics and logging whenever the table schema is updated. We encountered a schema synchronization bug that sometimes results in invalid plans. Will add more details later. But wanted to get some observability improvements whenever the broker updates its metadata cache similar to the other schema refreshes. --- .../apache/druid/segment/metadata/Metric.java | 3 ++ .../schema/BrokerSegmentMetadataCache.java | 30 +++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java index a29f4a7eafff..358720b98a82 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java @@ -53,6 +53,9 @@ public class Metric public static final String REFRESH_SKIPPED_TOMBSTONES = PREFIX + "refresh/tombstone/count"; public static final String REFRESH_DURATION_MILLIS = PREFIX + "refresh/time"; public static final String DATASOURCE_REMOVED = PREFIX + "dataSource/removed"; + public static final String SCHEMA_ROW_SIGNATURE_INITIALIZED = PREFIX + "rowSignature/initialized"; + public static final String SCHEMA_ROW_SIGNATURE_CHANGED = PREFIX + "rowSignature/changed"; + public static final String SCHEMA_ROW_SIGNATURE_COLUMN_COUNT = PREFIX + "rowSignature/column/count"; /** * Number of used cold segments in the metadata store. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index 084671d25e4d..c93a5ea83432 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -358,10 +358,34 @@ private Map queryDataSourceInformation(Set Date: Thu, 29 Jan 2026 19:30:46 -0800 Subject: [PATCH 2/4] Docs and test --- docs/operations/metrics.md | 3 ++ .../schema/BrokerSegmentMetadataCache.java | 10 ++-- .../BrokerSegmentMetadataCacheTest.java | 46 ++++++++++++++----- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 1a15a5314f88..6db82cad960e 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -462,6 +462,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon |`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`|| |`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`|| |`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|| +|`segment/schemaCache/rowSignature/initialized`|Emitted when a datasource row signature is initialized for the first time in the Broker's segment metadata cache.|`dataSource`|| +|`segment/schemaCache/rowSignature/changed`|Emitted when the cached row signature on the Broker's segment metadata cache for a datasource changes, indicating schema evolution or some form of flapping.|`dataSource`|| +|`segment/schemaCache/rowSignature/column/count`|Number of columns in the updated row signature for a datasource.|`dataSource`|| |`segment/schemaCache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments in the cluster.| |`segment/schemaCache/used/count`|Number of published used segments for which schema is cached.||Depends on the number of segments in the cluster.| |`segment/schemaCache/usedFingerprint/count`|Number of unique schema fingerprints cached for published used segments.||Depends on the number of distinct schema in the cluster.| diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index c93a5ea83432..a073d183cf87 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -359,7 +359,7 @@ private void updateDSMetadata(String dataSource, PhysicalDatasourceMetadata phys { final PhysicalDatasourceMetadata oldTable = tables.put(dataSource, physicalDatasourceMetadata); final RowSignature newRowSignature = physicalDatasourceMetadata.getRowSignature(); - final int newRowSignatureColumnCount = newRowSignature.getColumnNames().size(); + final int newColumnCount = newRowSignature.getColumnNames().size(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource); @@ -367,11 +367,11 @@ private void updateDSMetadata(String dataSource, PhysicalDatasourceMetadata phys if (oldTable == null) { log.info( "Row signature for datasource[%s] initialized with [%d] columns - signature[%s]", - dataSource, newRowSignatureColumnCount, newRowSignature + dataSource, newColumnCount, newRowSignature ); emitMetric(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, 1, builder); - emitMetric(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, newRowSignatureColumnCount, builder); + emitMetric(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, newColumnCount, builder); return; } @@ -379,11 +379,11 @@ private void updateDSMetadata(String dataSource, PhysicalDatasourceMetadata phys if (!oldRowSignature.equals(newRowSignature)) { log.info( "Row signature for datasource[%s] updated from [%d] columns to [%d] columns - new signature[%s]", - dataSource, oldRowSignature.getColumnNames().size(), newRowSignatureColumnCount, newRowSignature + dataSource, oldRowSignature.getColumnNames().size(), newColumnCount, newRowSignature ); emitMetric(Metric.SCHEMA_ROW_SIGNATURE_CHANGED, 1, builder); - emitMetric(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, newRowSignatureColumnCount, builder); + emitMetric(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, newColumnCount, builder); } else { log.debug("Row signature for datasource[%s] is unchanged.", dataSource); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index d0e5c0123989..41ce5e30acf8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.GlobalTableDataSource; @@ -117,6 +118,7 @@ public class BrokerSegmentMetadataCacheTest extends BrokerSegmentMetadataCacheTe private static final int WAIT_TIMEOUT_SECS = 6; private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); private BrokerSegmentMetadataCache runningSchema; + private StubServiceEmitter emitter = new StubServiceEmitter(); private CountDownLatch buildTableLatch = new CountDownLatch(1); private CountDownLatch markDataSourceLatch = new CountDownLatch(1); private CountDownLatch refreshLatch = new CountDownLatch(1); @@ -156,7 +158,7 @@ public BrokerSegmentMetadataCache buildSchemaMarkAndTableLatch( config, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), coordinatorClient, CentralizedDatasourceSchemaConfig.create() @@ -192,7 +194,7 @@ public BrokerSegmentMetadataCache buildSchemaMarkAndRefreshLatch() throws Interr SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() @@ -260,7 +262,7 @@ public ListenableFuture> fetchDataSourceInformation( SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), coordinatorClient, CentralizedDatasourceSchemaConfig.create() @@ -340,7 +342,7 @@ public ListenableFuture> fetchDataSourceInformation( SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), coordinatorClient, CentralizedDatasourceSchemaConfig.create() @@ -373,7 +375,7 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), coordinatorClient, CentralizedDatasourceSchemaConfig.create() @@ -409,7 +411,7 @@ public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled() throws In SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), config @@ -431,6 +433,14 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo2"), 1L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 1L); + + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L); + Assert.assertEquals(0, refreshLatch.getCount()); } @@ -449,7 +459,7 @@ public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled() throws I SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() @@ -726,7 +736,7 @@ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, I SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() @@ -825,6 +835,9 @@ public void refresh( Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); + Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); dataSources.remove("foo2"); @@ -843,6 +856,9 @@ public void refresh( fooDs = schema.getDatasource("foo"); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_CHANGED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); + emitter.verifySum(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 13L); + // check if the new column present in the added segment is present in the datasource schema // ensuring that the schema is rebuilt Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals)); @@ -987,6 +1003,15 @@ public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throw refreshLatch = new CountDownLatch(1); Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo2"), 1L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 1L); + + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L); + emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L); + + fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource); @@ -1043,7 +1068,7 @@ public void testRunSegmentMetadataQueryWithContext() throws Exception SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), internalQueryConfig, - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() @@ -1108,7 +1133,6 @@ public void testRefreshShouldEmitMetrics() throws InterruptedException, IOExcept { String dataSource = "xyz"; CountDownLatch addSegmentLatch = new CountDownLatch(2); - StubServiceEmitter emitter = new StubServiceEmitter("broker", "host"); BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, @@ -1204,7 +1228,7 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), internalQueryConfig, - new NoopServiceEmitter(), + emitter, new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() From 4e4f63af1ca1b38d073a01e0e492857842df051d Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Thu, 29 Jan 2026 19:39:25 -0800 Subject: [PATCH 3/4] Checkstyle --- .../sql/calcite/schema/BrokerSegmentMetadataCacheTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 41ce5e30acf8..fa489dae5400 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.GlobalTableDataSource; @@ -71,7 +70,6 @@ import org.apache.druid.server.QueryResponse; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.NoopEscalator; From 19bfc55bbf6407c1c3ec46dde2637f41d274e5bd Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Tue, 3 Feb 2026 23:38:43 -0800 Subject: [PATCH 4/4] Review --- docs/operations/metrics.md | 3 +-- .../java/org/apache/druid/segment/metadata/Metric.java | 1 - .../sql/calcite/schema/BrokerSegmentMetadataCache.java | 1 - .../calcite/schema/BrokerSegmentMetadataCacheTest.java | 9 --------- 4 files changed, 1 insertion(+), 13 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 6db82cad960e..690738cbc9ea 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -462,9 +462,8 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon |`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`|| |`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`|| |`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|| -|`segment/schemaCache/rowSignature/initialized`|Emitted when a datasource row signature is initialized for the first time in the Broker's segment metadata cache.|`dataSource`|| |`segment/schemaCache/rowSignature/changed`|Emitted when the cached row signature on the Broker's segment metadata cache for a datasource changes, indicating schema evolution or some form of flapping.|`dataSource`|| -|`segment/schemaCache/rowSignature/column/count`|Number of columns in the updated row signature for a datasource.|`dataSource`|| +|`segment/schemaCache/rowSignature/column/count`|Number of columns in the row signature on the Broker's segment metadata cache for a datasource when it's initialized or updated.|`dataSource`|| |`segment/schemaCache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments in the cluster.| |`segment/schemaCache/used/count`|Number of published used segments for which schema is cached.||Depends on the number of segments in the cluster.| |`segment/schemaCache/usedFingerprint/count`|Number of unique schema fingerprints cached for published used segments.||Depends on the number of distinct schema in the cluster.| diff --git a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java index 358720b98a82..96e53a3a98ab 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/Metric.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/Metric.java @@ -53,7 +53,6 @@ public class Metric public static final String REFRESH_SKIPPED_TOMBSTONES = PREFIX + "refresh/tombstone/count"; public static final String REFRESH_DURATION_MILLIS = PREFIX + "refresh/time"; public static final String DATASOURCE_REMOVED = PREFIX + "dataSource/removed"; - public static final String SCHEMA_ROW_SIGNATURE_INITIALIZED = PREFIX + "rowSignature/initialized"; public static final String SCHEMA_ROW_SIGNATURE_CHANGED = PREFIX + "rowSignature/changed"; public static final String SCHEMA_ROW_SIGNATURE_COLUMN_COUNT = PREFIX + "rowSignature/column/count"; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index a073d183cf87..26a6c63660d4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -370,7 +370,6 @@ private void updateDSMetadata(String dataSource, PhysicalDatasourceMetadata phys dataSource, newColumnCount, newRowSignature ); - emitMetric(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, 1, builder); emitMetric(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, newColumnCount, builder); return; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index fa489dae5400..c3a77888d4e6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -431,10 +431,6 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo2"), 1L); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 1L); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L); emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L); @@ -833,7 +829,6 @@ public void refresh( Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); @@ -1001,10 +996,6 @@ public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throw refreshLatch = new CountDownLatch(1); Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "foo2"), 1L); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_INITIALIZED, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 1L); - emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L); emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L); emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L);