From bfe9529692c8a21a72ef7d1e7ac9a96c010f41ac Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Wed, 23 Jul 2025 15:25:53 +0530 Subject: [PATCH 01/14] implement the segmentDiscoveryStatsProvider and counter and hook it up --- .../indexing/EmbeddedIndexTaskTest.java | 8 ++- .../EmbeddedKafkaClusterMetricsTest.java | 1 + .../msq/EmbeddedMSQRealtimeQueryTest.java | 3 +- .../apache/druid/client/BrokerServerView.java | 53 ++++++++++++++++- .../apache/druid/server/SegmentManager.java | 1 - .../metrics/SegmentDiscoveryStatsMonitor.java | 57 +++++++++++++++++++ .../SegmentDiscoveryStatsProvider.java | 13 +++++ .../java/org/apache/druid/cli/CliBroker.java | 2 + 8 files changed, 133 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsMonitor.java create mode 100644 server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java index ba3c16b81b56..6c42998ef0cc 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java @@ -61,8 +61,10 @@ public class EmbeddedIndexTaskTest extends EmbeddedClusterTestBase @Override public EmbeddedDruidCluster createCluster() { + broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.SegmentDiscoveryStatsMonitor\"]"); return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() + .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s") .addServer(coordinator) .addServer(indexer) .addServer(overlord) @@ -107,8 +109,10 @@ public void test_runIndexTask_forInlineDatasource() .hasDimension(DruidMetrics.DATASOURCE, dataSource), agg -> agg.hasSumAtLeast(10) ); - broker.latchableEmitter().waitForEvent( - event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource) + broker.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/discover/success") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasCountAtLeast(10) ); Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM %s", dataSource)); Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)); diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java index 8d10bae6e4c1..0953fe0980fe 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java @@ -113,6 +113,7 @@ public void stop() .addProperty("druid.manager.segments.killUnused.bufferPeriod", "PT0.1s") .addProperty("druid.manager.segments.killUnused.dutyPeriod", "PT1s"); coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced"); + broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.SegmentDiscoveryStatsMonitor\"]"); cluster.addExtension(KafkaIndexTaskModule.class) .addExtension(KafkaEmitterModule.class) .addExtension(LatchableEmitterModule.class) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index f10cab7a268d..d526a2d338cf 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -177,7 +177,8 @@ void setUpEach() agg -> agg.hasSumAtLeast(totalRows) ); broker.latchableEmitter().waitForEvent( - event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource) + event -> event.hasMetricName("segment/discover/success") + .hasDimension(DruidMetrics.DATASOURCE, dataSource) ); } diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 95dc86367250..b4e0084e1c9c 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -37,10 +37,15 @@ import org.apache.druid.segment.realtime.appenderator.SegmentSchemas; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.metrics.SegmentDiscoveryStatsProvider; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.utils.CollectionUtils; +import org.checkerframework.checker.lock.qual.GuardedBy; import java.util.ArrayList; import java.util.HashMap; @@ -52,6 +57,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -59,7 +65,7 @@ * */ @ManageLifecycle -public class BrokerServerView implements TimelineServerView +public class BrokerServerView implements TimelineServerView, SegmentDiscoveryStatsProvider { private static final Logger log = new Logger(BrokerServerView.class); @@ -76,6 +82,10 @@ public class BrokerServerView implements TimelineServerView private final CountDownLatch initialized = new CountDownLatch(1); private final FilteredServerInventoryView baseView; private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; + private final ConcurrentHashMap totalSuccessfulSegmentLoadCount = new ConcurrentHashMap<>(); + @GuardedBy("totalSuccessfulSegmentLoadCount") + private Map previousSuccessfulSegmentLoadCount = new HashMap<>(); + @Inject public BrokerServerView( @@ -281,6 +291,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); selectors.put(segmentId, selector); + incrementAndGetLong(totalSuccessfulSegmentLoadCount, getMetricKey(segment)); } QueryableDruidServer queryableDruidServer = clients.get(server.getName()); @@ -436,4 +447,44 @@ public List getDruidServers() .map(queryableDruidServer -> queryableDruidServer.getServer().toImmutableDruidServer()) .collect(Collectors.toList()); } + + @Override + public Map getTotalSuccessfulSegmentLoadCount() + { + final Map total = CollectionUtils.mapValues(totalSuccessfulSegmentLoadCount, AtomicLong::get); + synchronized (totalSuccessfulSegmentLoadCount) { + final Map delta = getDeltaValues(total, previousSuccessfulSegmentLoadCount); + previousSuccessfulSegmentLoadCount = total; + return delta; + } + } + + private static long incrementAndGetLong(ConcurrentHashMap counters, RowKey key) + { + AtomicLong counter = counters.get(key); + if (counter == null) { + counter = counters.computeIfAbsent(key, k -> new AtomicLong()); + } + return counter.incrementAndGet(); + } + + private Map getDeltaValues(Map total, Map prev) + { + final Map deltaValues = new HashMap<>(); + total.forEach( + (dataSource, totalCount) -> deltaValues.put( + dataSource, + totalCount - prev.getOrDefault(dataSource, 0L) + ) + ); + return deltaValues; + } + + private static RowKey getMetricKey(final DataSegment segment) + { + if (segment == null) { + return RowKey.empty(); + } + return RowKey.with(Dimension.DATASOURCE, segment.getDataSource()).build(); + } } diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 060bf285114b..2a9477aaa7da 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -369,7 +369,6 @@ public void shutdownBootstrap() cacheManager.shutdownBootstrap(); } - /** * Represent the state of a data source including the timeline, total segment size, and number of segments. */ diff --git a/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsMonitor.java new file mode 100644 index 000000000000..92299b535a62 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsMonitor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.inject.Inject; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.server.coordinator.stats.RowKey; + +import java.util.Map; + +public class SegmentDiscoveryStatsMonitor extends AbstractMonitor +{ + private final SegmentDiscoveryStatsProvider statsProvider; + + @Inject + public SegmentDiscoveryStatsMonitor(SegmentDiscoveryStatsProvider statsProvider) + { + this.statsProvider = statsProvider; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + emit(emitter, "segment/discover/success", statsProvider.getTotalSuccessfulSegmentLoadCount()); + return true; + } + + private void emit(ServiceEmitter emitter, String key, Map counts) + { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + if (counts != null) { + counts.forEach((k, v) -> { + k.getValues().forEach((dim, value) -> builder.setDimension(dim.reportedName(), value)); + emitter.emit(builder.setMetric(key, v)); + }); + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java new file mode 100644 index 000000000000..675e4f06b70e --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java @@ -0,0 +1,13 @@ +package org.apache.druid.server.metrics; + +import org.apache.druid.server.coordinator.stats.RowKey; + +import java.util.Map; + +public interface SegmentDiscoveryStatsProvider +{ + /** + * Return the number of successful segment loads for each datasource during the emission period. + */ + Map getTotalSuccessfulSegmentLoadCount(); +} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 7e3c11a66311..5031cea6c046 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -76,6 +76,7 @@ import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; +import org.apache.druid.server.metrics.SegmentDiscoveryStatsProvider; import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.sql.calcite.schema.MetadataSegmentView; @@ -141,6 +142,7 @@ protected List getModules() LifecycleModule.register(binder, MetadataSegmentView.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); binder.bind(QueryableDruidServer.Maker.class).to(DirectDruidClientFactory.class).in(LazySingleton.class); + binder.bind(SegmentDiscoveryStatsProvider.class).to(BrokerServerView.class); JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class); binder.install(new CacheModule()); From f2a174a1f16737165b11f268e1f8cd042005f57d Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Thu, 24 Jul 2025 15:57:59 +0530 Subject: [PATCH 02/14] Address review comments --- .../indexing/EmbeddedIndexTaskTest.java | 9 ++-- .../EmbeddedKafkaClusterMetricsTest.java | 2 +- .../msq/EmbeddedMSQRealtimeQueryTest.java | 11 +++-- .../server/EmbeddedHighAvailabilityTest.java | 8 ++++ .../apache/druid/client/BrokerServerView.java | 47 +++++++++---------- .../apache/druid/server/SegmentManager.java | 1 + .../server/coordinator/stats/Dimension.java | 4 +- ...va => BrokerSegmentCountStatsMonitor.java} | 11 +++-- .../BrokerSegmentCountStatsProvider.java | 32 +++++++++++++ .../SegmentDiscoveryStatsProvider.java | 13 ----- .../druid/client/BrokerServerViewTest.java | 30 ++++++++++++ .../java/org/apache/druid/cli/CliBroker.java | 4 +- 12 files changed, 117 insertions(+), 55 deletions(-) rename server/src/main/java/org/apache/druid/server/metrics/{SegmentDiscoveryStatsMonitor.java => BrokerSegmentCountStatsMonitor.java} (80%) create mode 100644 server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java delete mode 100644 server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java index 6c42998ef0cc..926f7ad35ce1 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java @@ -61,7 +61,7 @@ public class EmbeddedIndexTaskTest extends EmbeddedClusterTestBase @Override public EmbeddedDruidCluster createCluster() { - broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.SegmentDiscoveryStatsMonitor\"]"); + broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]"); return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s") @@ -109,10 +109,9 @@ public void test_runIndexTask_forInlineDatasource() .hasDimension(DruidMetrics.DATASOURCE, dataSource), agg -> agg.hasSumAtLeast(10) ); - broker.latchableEmitter().waitForEventAggregate( - event -> event.hasMetricName("segment/discover/success") - .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasCountAtLeast(10) + broker.latchableEmitter().waitForEvent( + event -> event.hasMetricName("segment/available/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource) ); Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM %s", dataSource)); Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)); diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java index 0953fe0980fe..27112691dac8 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java @@ -113,7 +113,7 @@ public void stop() .addProperty("druid.manager.segments.killUnused.bufferPeriod", "PT0.1s") .addProperty("druid.manager.segments.killUnused.dutyPeriod", "PT1s"); coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced"); - broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.SegmentDiscoveryStatsMonitor\"]"); + broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]"); cluster.addExtension(KafkaIndexTaskModule.class) .addExtension(KafkaEmitterModule.class) .addExtension(LatchableEmitterModule.class) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index d526a2d338cf..e0f1ce0819d8 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -104,7 +104,9 @@ public EmbeddedDruidCluster createCluster() .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); broker.addProperty("druid.msq.dart.controller.heapFraction", "0.9") - .addProperty("druid.query.default.context.maxConcurrentStages", "1"); + .addProperty("druid.query.default.context.maxConcurrentStages", "1") + .addProperty("druid.monitoring.emissionPeriod", "PT0.1s") + .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]"); historical.addProperty("druid.msq.dart.worker.heapFraction", "0.9") .addProperty("druid.msq.dart.worker.concurrentQueries", "1") @@ -176,9 +178,10 @@ void setUpEach() .hasDimension(DruidMetrics.DATASOURCE, Collections.singletonList(dataSource)), agg -> agg.hasSumAtLeast(totalRows) ); - broker.latchableEmitter().waitForEvent( - event -> event.hasMetricName("segment/discover/success") - .hasDimension(DruidMetrics.DATASOURCE, dataSource) + broker.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/available/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasCountAtLeast(10) ); } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java index 2f5a0cfb173f..bd3183fa7184 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedClusterApis; import org.apache.druid.testing.embedded.EmbeddedCoordinator; @@ -71,6 +72,9 @@ protected EmbeddedDruidCluster createCluster() overlord1.addProperty("druid.plaintextPort", "7090"); coordinator1.addProperty("druid.plaintextPort", "7081"); + broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]") + .addProperty("druid.monitoring.emissionPeriod", "PT0.1s"); + return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() .addServer(coordinator1) @@ -119,6 +123,10 @@ public void test_switchLeader_andVerifyUsingSysTables() // Run sys queries, switch leaders, repeat ServerPair overlordPair = createServerPair(overlord1, overlord2); ServerPair coordinatorPair = createServerPair(coordinator1, coordinator2); + broker.latchableEmitter().waitForEvent( + event -> event.hasMetricName("segment/available/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource) + ); for (int i = 0; i < 3; ++i) { Assertions.assertEquals( "1", diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index b4e0084e1c9c..be23d55ac2b1 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -39,13 +39,12 @@ import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; -import org.apache.druid.server.metrics.SegmentDiscoveryStatsProvider; +import org.apache.druid.server.metrics.BrokerSegmentCountStatsProvider; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.utils.CollectionUtils; -import org.checkerframework.checker.lock.qual.GuardedBy; import java.util.ArrayList; import java.util.HashMap; @@ -65,7 +64,7 @@ * */ @ManageLifecycle -public class BrokerServerView implements TimelineServerView, SegmentDiscoveryStatsProvider +public class BrokerServerView implements TimelineServerView, BrokerSegmentCountStatsProvider { private static final Logger log = new Logger(BrokerServerView.class); @@ -82,10 +81,8 @@ public class BrokerServerView implements TimelineServerView, SegmentDiscoverySta private final CountDownLatch initialized = new CountDownLatch(1); private final FilteredServerInventoryView baseView; private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; - private final ConcurrentHashMap totalSuccessfulSegmentLoadCount = new ConcurrentHashMap<>(); - @GuardedBy("totalSuccessfulSegmentLoadCount") - private Map previousSuccessfulSegmentLoadCount = new HashMap<>(); + private final ConcurrentHashMap segmentAvailableCount = new ConcurrentHashMap<>(); @Inject public BrokerServerView( @@ -169,7 +166,8 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) baseView.registerServerCallback( exec, - new ServerCallback() { + new ServerCallback() + { @Override public CallbackAction serverAdded(DruidServer server) { @@ -291,7 +289,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); selectors.put(segmentId, selector); - incrementAndGetLong(totalSuccessfulSegmentLoadCount, getMetricKey(segment)); + incrementAndGetLong(segmentAvailableCount, getMetricKey(segment)); } QueryableDruidServer queryableDruidServer = clients.get(server.getName()); @@ -368,6 +366,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen segment.getVersion() ); } else { + decrementAndGetLong(segmentAvailableCount, getMetricKey(segment)); runTimelineCallbacks(callback -> callback.segmentRemoved(segment)); } } @@ -449,14 +448,9 @@ public List getDruidServers() } @Override - public Map getTotalSuccessfulSegmentLoadCount() + public Map getAvailableSegmentCount() { - final Map total = CollectionUtils.mapValues(totalSuccessfulSegmentLoadCount, AtomicLong::get); - synchronized (totalSuccessfulSegmentLoadCount) { - final Map delta = getDeltaValues(total, previousSuccessfulSegmentLoadCount); - previousSuccessfulSegmentLoadCount = total; - return delta; - } + return CollectionUtils.mapValues(segmentAvailableCount, AtomicLong::get); } private static long incrementAndGetLong(ConcurrentHashMap counters, RowKey key) @@ -468,16 +462,17 @@ private static long incrementAndGetLong(ConcurrentHashMap co return counter.incrementAndGet(); } - private Map getDeltaValues(Map total, Map prev) + private static long decrementAndGetLong(ConcurrentHashMap counters, RowKey key) { - final Map deltaValues = new HashMap<>(); - total.forEach( - (dataSource, totalCount) -> deltaValues.put( - dataSource, - totalCount - prev.getOrDefault(dataSource, 0L) - ) - ); - return deltaValues; + AtomicLong counter = counters.get(key); + long cnt = 0L; + if (counter != null) { + cnt = counter.decrementAndGet(); + if (cnt == 0) { + counters.remove(key, counter); + } + } + return cnt; } private static RowKey getMetricKey(final DataSegment segment) @@ -485,6 +480,8 @@ private static RowKey getMetricKey(final DataSegment segment) if (segment == null) { return RowKey.empty(); } - return RowKey.with(Dimension.DATASOURCE, segment.getDataSource()).build(); + return RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) + .with(Dimension.INTERVAL, String.valueOf(segment.getInterval())) + .and(Dimension.VERSION, segment.getVersion()); } } diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 2a9477aaa7da..060bf285114b 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -369,6 +369,7 @@ public void shutdownBootstrap() cacheManager.shutdownBootstrap(); } + /** * Represent the state of a data source including the timeline, total segment size, and number of segments. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java index 7001f7fa4550..9d119b88a2f1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java @@ -30,7 +30,9 @@ public enum Dimension DUTY("duty"), DUTY_GROUP("dutyGroup"), DESCRIPTION("description"), - SERVER("server"); + SERVER("server"), + INTERVAL("interval"), + VERSION("version"); private final String reportedName; diff --git a/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java similarity index 80% rename from server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsMonitor.java rename to server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java index 92299b535a62..203b1fa51d1d 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java @@ -27,12 +27,15 @@ import java.util.Map; -public class SegmentDiscoveryStatsMonitor extends AbstractMonitor +/** + * {@Code BrokerSegmentCountStatsMonitor} tracks the currently available segments that broker has discovered. + */ +public class BrokerSegmentCountStatsMonitor extends AbstractMonitor { - private final SegmentDiscoveryStatsProvider statsProvider; + private final BrokerSegmentCountStatsProvider statsProvider; @Inject - public SegmentDiscoveryStatsMonitor(SegmentDiscoveryStatsProvider statsProvider) + public BrokerSegmentCountStatsMonitor(BrokerSegmentCountStatsProvider statsProvider) { this.statsProvider = statsProvider; } @@ -40,7 +43,7 @@ public SegmentDiscoveryStatsMonitor(SegmentDiscoveryStatsProvider statsProvider) @Override public boolean doMonitor(ServiceEmitter emitter) { - emit(emitter, "segment/discover/success", statsProvider.getTotalSuccessfulSegmentLoadCount()); + emit(emitter, "segment/available/count", statsProvider.getAvailableSegmentCount()); return true; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java new file mode 100644 index 000000000000..71cdb447e132 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import org.apache.druid.server.coordinator.stats.RowKey; + +import java.util.Map; + +public interface BrokerSegmentCountStatsProvider +{ + /** + * Return the number of available segments discovered by broker for a datasource. + */ + Map getAvailableSegmentCount(); +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java deleted file mode 100644 index 675e4f06b70e..000000000000 --- a/server/src/main/java/org/apache/druid/server/metrics/SegmentDiscoveryStatsProvider.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.druid.server.metrics; - -import org.apache.druid.server.coordinator.stats.RowKey; - -import java.util.Map; - -public interface SegmentDiscoveryStatsProvider -{ - /** - * Return the number of successful segment loads for each datasource during the emission period. - */ - Map getTotalSuccessfulSegmentLoadCount(); -} diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 1c65e2be2beb..e510609036ad 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -47,6 +47,8 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.TestCoordinatorClient; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -64,12 +66,16 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import static org.apache.druid.server.coordinator.stats.Dimension.INTERVAL; + public class BrokerServerViewTest extends CuratorTestBase { private final ObjectMapper jsonMapper; @@ -116,6 +122,17 @@ public void testSingleServerAddedRemovedSegment() throws Exception announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + Map availableSegmentCount = brokerServerView.getAvailableSegmentCount(); + for (Map.Entry entry : availableSegmentCount.entrySet()) { + Assert.assertEquals( + RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) + .with(Dimension.INTERVAL, segment.getInterval().toString()) + .with(Dimension.VERSION, segment.getVersion()) + .build(), + entry.getKey() + ); + Assert.assertEquals(1L, (long) entry.getValue()); + } TimelineLookup timeline = brokerServerView.getTimeline( new TableDataSource("test_broker_server_view") @@ -144,6 +161,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception 0, timeline.lookup(intervals).size() ); + Assert.assertEquals(0, brokerServerView.getAvailableSegmentCount().size()); Assert.assertNull(timeline.findChunk(intervals, "v1", partition)); } @@ -194,6 +212,18 @@ public void testMultipleServerAddedRemovedSegment() throws Exception ) ) ); + Map availableSegmentCount = brokerServerView.getAvailableSegmentCount(); + Map expectedSegmentCount = new HashMap<>(); + for (DataSegment segment : segments) { + expectedSegmentCount.put( + RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) + .with(INTERVAL, segment.getInterval().toString()) + .with(Dimension.VERSION, segment.getVersion()) + .build(), + 1L + ); + } + Assert.assertEquals(expectedSegmentCount, availableSegmentCount); // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") unannounceSegmentForServer(druidServers.get(2), segments.get(2), zkPathsConfig); diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 5031cea6c046..237ba401240f 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -75,8 +75,8 @@ import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.metrics.BrokerSegmentCountStatsProvider; import org.apache.druid.server.metrics.QueryCountStatsProvider; -import org.apache.druid.server.metrics.SegmentDiscoveryStatsProvider; import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.sql.calcite.schema.MetadataSegmentView; @@ -142,7 +142,7 @@ protected List getModules() LifecycleModule.register(binder, MetadataSegmentView.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); binder.bind(QueryableDruidServer.Maker.class).to(DirectDruidClientFactory.class).in(LazySingleton.class); - binder.bind(SegmentDiscoveryStatsProvider.class).to(BrokerServerView.class); + binder.bind(BrokerSegmentCountStatsProvider.class).to(BrokerServerView.class); JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class); binder.install(new CacheModule()); From a1123bff5e100b32f8158151940cf6791649267f Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Thu, 24 Jul 2025 17:26:49 +0530 Subject: [PATCH 03/14] Fix jacoco coverage --- .../BrokerSegmentCountStatsMonitorTest.java | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java diff --git a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java new file mode 100644 index 000000000000..45d036adf290 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +public class BrokerSegmentCountStatsMonitorTest +{ + private BrokerSegmentCountStatsProvider statsProvider; + private static final RowKey SEGMENT_METRIC_KEY1 = RowKey.with(Dimension.DATASOURCE, "dataSource1") + .with(Dimension.VERSION, "2024-01-01T00:00:00.000Z") + .with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z") + .build(); + private static final RowKey SEGMENT_METRIC_KEY2 = RowKey.with(Dimension.DATASOURCE, "dataSource2") + .with(Dimension.VERSION, "2024-01-02T00:00:00.000Z") + .with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z") + .build(); + + @Before + public void setUp() + { + statsProvider = new BrokerSegmentCountStatsProvider() + { + @Override + public Map getAvailableSegmentCount() + { + return ImmutableMap.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 5L); + } + }; + } + + @Test + public void testMonitor() + { + final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(statsProvider); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Assert.assertTrue(monitor.doMonitor(emitter)); + + Assert.assertEquals(2, emitter.getNumEmittedEvents()); + + emitter.verifyValue("segment/available/count", Map.of("dataSource", "dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 10L); + emitter.verifyValue("segment/available/count", Map.of("dataSource", "dataSource2", "version", "2024-01-02T00:00:00.000Z", "interval", "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z"), 5L); + } + + @Test + public void testMonitorWithNullCounts() + { + final BrokerSegmentCountStatsProvider nullStatsProvider = new BrokerSegmentCountStatsProvider() + { + @Override + public Map getAvailableSegmentCount() + { + return null; + } + }; + + final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(nullStatsProvider); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Assert.assertTrue(monitor.doMonitor(emitter)); + + Assert.assertEquals(0, emitter.getNumEmittedEvents()); + } + + @Test + public void testMonitorWithEmptyCounts() + { + final BrokerSegmentCountStatsProvider emptyStatsProvider = new BrokerSegmentCountStatsProvider() + { + @Override + public Map getAvailableSegmentCount() + { + return ImmutableMap.of(); + } + }; + + final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(emptyStatsProvider); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Assert.assertTrue(monitor.doMonitor(emitter)); + + Assert.assertEquals(0, emitter.getNumEmittedEvents()); + } +} From 02f6d3959ae1a1b95ea43b6096bef109882b1914 Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Thu, 24 Jul 2025 22:01:18 +0530 Subject: [PATCH 04/14] Address review comments --- docs/configuration/index.md | 1 + .../indexing/EmbeddedIndexTaskTest.java | 5 +- .../EmbeddedKafkaClusterMetricsTest.java | 1 - .../msq/EmbeddedMSQRealtimeQueryTest.java | 7 +-- .../server/EmbeddedHighAvailabilityTest.java | 8 --- .../apache/druid/client/BrokerServerView.java | 37 +++++--------- .../BrokerSegmentCountStatsMonitor.java | 2 +- .../BrokerSegmentCountStatsProvider.java | 2 +- .../druid/client/BrokerServerViewTest.java | 4 +- .../BrokerSegmentCountStatsMonitorTest.java | 49 ++++--------------- 10 files changed, 31 insertions(+), 85 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 0976190cf99a..4c700d784c9b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1960,6 +1960,7 @@ The following table lists the monitors that are available and the services you c |`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. |MiddleManager, Indexer| |`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat for the service.|Any| |`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report metrics for groupBy queries like disk and merge buffer utilization. |Broker, Historical, Indexer, Peon| +|`org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor`| Report the number of segments of a datasource currently queryable by a Broker.|Broker| For example, if you only wanted monitors on all services for system and JVM information, you'd add the following to `common.runtime.properties`: diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java index 926f7ad35ce1..cedca675b67a 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java @@ -109,9 +109,10 @@ public void test_runIndexTask_forInlineDatasource() .hasDimension(DruidMetrics.DATASOURCE, dataSource), agg -> agg.hasSumAtLeast(10) ); - broker.latchableEmitter().waitForEvent( + broker.latchableEmitter().waitForEventAggregate( event -> event.hasMetricName("segment/available/count") - .hasDimension(DruidMetrics.DATASOURCE, dataSource) + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasCountAtLeast(10) ); Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM %s", dataSource)); Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)); diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java index 27112691dac8..8d10bae6e4c1 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java @@ -113,7 +113,6 @@ public void stop() .addProperty("druid.manager.segments.killUnused.bufferPeriod", "PT0.1s") .addProperty("druid.manager.segments.killUnused.dutyPeriod", "PT1s"); coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced"); - broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]"); cluster.addExtension(KafkaIndexTaskModule.class) .addExtension(KafkaEmitterModule.class) .addExtension(LatchableEmitterModule.class) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index e0f1ce0819d8..7b9f44e35f59 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -178,10 +178,11 @@ void setUpEach() .hasDimension(DruidMetrics.DATASOURCE, Collections.singletonList(dataSource)), agg -> agg.hasSumAtLeast(totalRows) ); - broker.latchableEmitter().waitForEventAggregate( + broker.latchableEmitter().waitForEvent( event -> event.hasMetricName("segment/available/count") - .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasCountAtLeast(10) + .hasDimension(DruidMetrics.DATASOURCE, dataSource) + .hasDimension(DruidMetrics.INTERVAL, "2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z") + .hasValue(1) ); } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java index bd3183fa7184..2f5a0cfb173f 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedClusterApis; import org.apache.druid.testing.embedded.EmbeddedCoordinator; @@ -72,9 +71,6 @@ protected EmbeddedDruidCluster createCluster() overlord1.addProperty("druid.plaintextPort", "7090"); coordinator1.addProperty("druid.plaintextPort", "7081"); - broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]") - .addProperty("druid.monitoring.emissionPeriod", "PT0.1s"); - return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() .addServer(coordinator1) @@ -123,10 +119,6 @@ public void test_switchLeader_andVerifyUsingSysTables() // Run sys queries, switch leaders, repeat ServerPair overlordPair = createServerPair(overlord1, overlord2); ServerPair coordinatorPair = createServerPair(coordinator1, coordinator2); - broker.latchableEmitter().waitForEvent( - event -> event.hasMetricName("segment/available/count") - .hasDimension(DruidMetrics.DATASOURCE, dataSource) - ); for (int i = 0; i < 3; ++i) { Assertions.assertEquals( "1", diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index be23d55ac2b1..317af1a94492 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -44,7 +44,6 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.utils.CollectionUtils; import java.util.ArrayList; import java.util.HashMap; @@ -56,7 +55,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -81,8 +79,7 @@ public class BrokerServerView implements TimelineServerView, BrokerSegmentCountS private final CountDownLatch initialized = new CountDownLatch(1); private final FilteredServerInventoryView baseView; private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; - - private final ConcurrentHashMap segmentAvailableCount = new ConcurrentHashMap<>(); + private final ConcurrentHashMap segmentAvailableCount = new ConcurrentHashMap<>(); @Inject public BrokerServerView( @@ -289,7 +286,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); selectors.put(segmentId, selector); - incrementAndGetLong(segmentAvailableCount, getMetricKey(segment)); + incrementSegmentCount(getMetricKey(segment)); } QueryableDruidServer queryableDruidServer = clients.get(server.getName()); @@ -366,7 +363,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen segment.getVersion() ); } else { - decrementAndGetLong(segmentAvailableCount, getMetricKey(segment)); + decrementSegmentCount(getMetricKey(segment)); runTimelineCallbacks(callback -> callback.segmentRemoved(segment)); } } @@ -450,36 +447,24 @@ public List getDruidServers() @Override public Map getAvailableSegmentCount() { - return CollectionUtils.mapValues(segmentAvailableCount, AtomicLong::get); + return segmentAvailableCount; } - private static long incrementAndGetLong(ConcurrentHashMap counters, RowKey key) + private void incrementSegmentCount(RowKey key) { - AtomicLong counter = counters.get(key); - if (counter == null) { - counter = counters.computeIfAbsent(key, k -> new AtomicLong()); - } - return counter.incrementAndGet(); + segmentAvailableCount.compute(key, (k, currentValue) -> currentValue == null ? 1 : currentValue + 1); } - private static long decrementAndGetLong(ConcurrentHashMap counters, RowKey key) + private void decrementSegmentCount(RowKey key) { - AtomicLong counter = counters.get(key); - long cnt = 0L; - if (counter != null) { - cnt = counter.decrementAndGet(); - if (cnt == 0) { - counters.remove(key, counter); - } - } - return cnt; + segmentAvailableCount.computeIfPresent(key, (k, currentValue) -> { + long newValue = currentValue - 1; + return newValue > 0 ? newValue : null; + }); } private static RowKey getMetricKey(final DataSegment segment) { - if (segment == null) { - return RowKey.empty(); - } return RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) .with(Dimension.INTERVAL, String.valueOf(segment.getInterval())) .and(Dimension.VERSION, segment.getVersion()); diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java index 203b1fa51d1d..31f37f7247d8 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java @@ -28,7 +28,7 @@ import java.util.Map; /** - * {@Code BrokerSegmentCountStatsMonitor} tracks the currently available segments that broker has discovered. + * Monitor that tracks the number of segments of a datasource currently queryable by this Broker. */ public class BrokerSegmentCountStatsMonitor extends AbstractMonitor { diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java index 71cdb447e132..7ee297a07630 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java @@ -26,7 +26,7 @@ public interface BrokerSegmentCountStatsProvider { /** - * Return the number of available segments discovered by broker for a datasource. + * Return the number of segments queryable by a Broker for a datasource, interval and version. */ Map getAvailableSegmentCount(); } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index e510609036ad..8c2d8b16486e 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -74,8 +74,6 @@ import java.util.concurrent.Executor; import java.util.stream.Collectors; -import static org.apache.druid.server.coordinator.stats.Dimension.INTERVAL; - public class BrokerServerViewTest extends CuratorTestBase { private final ObjectMapper jsonMapper; @@ -217,7 +215,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception for (DataSegment segment : segments) { expectedSegmentCount.put( RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) - .with(INTERVAL, segment.getInterval().toString()) + .with(Dimension.INTERVAL, segment.getInterval().toString()) .with(Dimension.VERSION, segment.getVersion()) .build(), 1L diff --git a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java index 45d036adf290..dd1043478f90 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java @@ -19,12 +19,10 @@ package org.apache.druid.server.metrics; -import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.util.Map; @@ -41,23 +39,12 @@ public class BrokerSegmentCountStatsMonitorTest .with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z") .build(); - @Before - public void setUp() - { - statsProvider = new BrokerSegmentCountStatsProvider() - { - @Override - public Map getAvailableSegmentCount() - { - return ImmutableMap.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 5L); - } - }; - } - @Test - public void testMonitor() + public void test_monitor() { - final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(statsProvider); + final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor( + () -> Map.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 5L) + ); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); @@ -68,37 +55,19 @@ public void testMonitor() } @Test - public void testMonitorWithNullCounts() + public void test_monitor_withNullCounts() { - final BrokerSegmentCountStatsProvider nullStatsProvider = new BrokerSegmentCountStatsProvider() - { - @Override - public Map getAvailableSegmentCount() - { - return null; - } - }; - - final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(nullStatsProvider); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(() -> null); + final StubServiceEmitter emitter = new StubServiceEmitter(); Assert.assertTrue(monitor.doMonitor(emitter)); Assert.assertEquals(0, emitter.getNumEmittedEvents()); } @Test - public void testMonitorWithEmptyCounts() + public void test_monitor_withEmptyCounts() { - final BrokerSegmentCountStatsProvider emptyStatsProvider = new BrokerSegmentCountStatsProvider() - { - @Override - public Map getAvailableSegmentCount() - { - return ImmutableMap.of(); - } - }; - - final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(emptyStatsProvider); + final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(() -> Map.of()); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); From eb7d8bb7d27144f89214453e755ac5eca39a4361 Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Fri, 25 Jul 2025 06:19:25 +0530 Subject: [PATCH 05/14] Attempt fixing embedded tests --- .../testing/embedded/indexing/EmbeddedIndexTaskTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java index cedca675b67a..8daaa90803cd 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java @@ -85,7 +85,11 @@ public void test_runIndexTask_forInlineDatasource() cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task)); cluster.callApi().waitForTaskToSucceed(taskId, overlord); - + broker.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("segment/available/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(10) + ); // Verify that the task created 10 DAY-granularity segments final List segments = new ArrayList<>( overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource, null) @@ -112,7 +116,7 @@ public void test_runIndexTask_forInlineDatasource() broker.latchableEmitter().waitForEventAggregate( event -> event.hasMetricName("segment/available/count") .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasCountAtLeast(10) + agg -> agg.hasSumAtLeast(10) ); Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM %s", dataSource)); Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)); From 1fe9e5240b6f9c4c1c8e8d4e132db16027276010 Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Fri, 25 Jul 2025 11:17:10 +0530 Subject: [PATCH 06/14] Fix embedded tests for minio --- docs/configuration/index.md | 2 +- .../embedded/indexing/EmbeddedIndexTaskTest.java | 6 +++--- .../embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 2 +- .../java/org/apache/druid/client/BrokerServerView.java | 4 ++-- ...tatsMonitor.java => BrokerSegmentStatsMonitor.java} | 9 ++++++--- ...tsProvider.java => BrokerSegmentStatsProvider.java} | 2 +- ...torTest.java => BrokerSegmentStatsMonitorTest.java} | 10 +++++----- .../src/main/java/org/apache/druid/cli/CliBroker.java | 4 ++-- 8 files changed, 21 insertions(+), 18 deletions(-) rename server/src/main/java/org/apache/druid/server/metrics/{BrokerSegmentCountStatsMonitor.java => BrokerSegmentStatsMonitor.java} (84%) rename server/src/main/java/org/apache/druid/server/metrics/{BrokerSegmentCountStatsProvider.java => BrokerSegmentStatsProvider.java} (95%) rename server/src/test/java/org/apache/druid/server/metrics/{BrokerSegmentCountStatsMonitorTest.java => BrokerSegmentStatsMonitorTest.java} (88%) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 4c700d784c9b..2269625936b3 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1960,7 +1960,7 @@ The following table lists the monitors that are available and the services you c |`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. |MiddleManager, Indexer| |`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat for the service.|Any| |`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report metrics for groupBy queries like disk and merge buffer utilization. |Broker, Historical, Indexer, Peon| -|`org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor`| Report the number of segments of a datasource currently queryable by a Broker.|Broker| +|`org.apache.druid.server.metrics.BrokerSegmentStatsMonitor`| Report the number of segments of a datasource currently queryable by a Broker.|Broker| For example, if you only wanted monitors on all services for system and JVM information, you'd add the following to `common.runtime.properties`: diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java index 8daaa90803cd..197d0ab5fed4 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java @@ -52,7 +52,9 @@ */ public class EmbeddedIndexTaskTest extends EmbeddedClusterTestBase { - protected final EmbeddedBroker broker = new EmbeddedBroker(); + protected final EmbeddedBroker broker = new EmbeddedBroker() + .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]") + .addProperty("druid.monitoring.emissionPeriod", "PT0.1s"); protected final EmbeddedIndexer indexer = new EmbeddedIndexer().addProperty("druid.worker.capacity", "25"); protected final EmbeddedOverlord overlord = new EmbeddedOverlord(); protected final EmbeddedHistorical historical = new EmbeddedHistorical(); @@ -61,10 +63,8 @@ public class EmbeddedIndexTaskTest extends EmbeddedClusterTestBase @Override public EmbeddedDruidCluster createCluster() { - broker.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]"); return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() - .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s") .addServer(coordinator) .addServer(indexer) .addServer(overlord) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index 7b9f44e35f59..27bdf268525c 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -106,7 +106,7 @@ public EmbeddedDruidCluster createCluster() broker.addProperty("druid.msq.dart.controller.heapFraction", "0.9") .addProperty("druid.query.default.context.maxConcurrentStages", "1") .addProperty("druid.monitoring.emissionPeriod", "PT0.1s") - .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentCountStatsMonitor\"]"); + .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]"); historical.addProperty("druid.msq.dart.worker.heapFraction", "0.9") .addProperty("druid.msq.dart.worker.concurrentQueries", "1") diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 317af1a94492..c68648ae0706 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -39,7 +39,7 @@ import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; -import org.apache.druid.server.metrics.BrokerSegmentCountStatsProvider; +import org.apache.druid.server.metrics.BrokerSegmentStatsProvider; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -62,7 +62,7 @@ * */ @ManageLifecycle -public class BrokerServerView implements TimelineServerView, BrokerSegmentCountStatsProvider +public class BrokerServerView implements TimelineServerView, BrokerSegmentStatsProvider { private static final Logger log = new Logger(BrokerServerView.class); diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitor.java similarity index 84% rename from server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java rename to server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitor.java index 31f37f7247d8..01713bd454d4 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitor.java @@ -20,6 +20,8 @@ package org.apache.druid.server.metrics; import com.google.inject.Inject; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.LoadScope; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; @@ -30,12 +32,13 @@ /** * Monitor that tracks the number of segments of a datasource currently queryable by this Broker. */ -public class BrokerSegmentCountStatsMonitor extends AbstractMonitor +@LoadScope(roles = {NodeRole.BROKER_JSON_NAME}) +public class BrokerSegmentStatsMonitor extends AbstractMonitor { - private final BrokerSegmentCountStatsProvider statsProvider; + private final BrokerSegmentStatsProvider statsProvider; @Inject - public BrokerSegmentCountStatsMonitor(BrokerSegmentCountStatsProvider statsProvider) + public BrokerSegmentStatsMonitor(BrokerSegmentStatsProvider statsProvider) { this.statsProvider = statsProvider; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java similarity index 95% rename from server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java rename to server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java index 7ee297a07630..d2230090dc6d 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java @@ -23,7 +23,7 @@ import java.util.Map; -public interface BrokerSegmentCountStatsProvider +public interface BrokerSegmentStatsProvider { /** * Return the number of segments queryable by a Broker for a datasource, interval and version. diff --git a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java similarity index 88% rename from server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java rename to server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java index dd1043478f90..4669910cdcb5 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java @@ -27,9 +27,9 @@ import java.util.Map; -public class BrokerSegmentCountStatsMonitorTest +public class BrokerSegmentStatsMonitorTest { - private BrokerSegmentCountStatsProvider statsProvider; + private BrokerSegmentStatsProvider statsProvider; private static final RowKey SEGMENT_METRIC_KEY1 = RowKey.with(Dimension.DATASOURCE, "dataSource1") .with(Dimension.VERSION, "2024-01-01T00:00:00.000Z") .with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z") @@ -42,7 +42,7 @@ public class BrokerSegmentCountStatsMonitorTest @Test public void test_monitor() { - final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor( + final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor( () -> Map.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 5L) ); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); @@ -57,7 +57,7 @@ public void test_monitor() @Test public void test_monitor_withNullCounts() { - final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(() -> null); + final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor(() -> null); final StubServiceEmitter emitter = new StubServiceEmitter(); Assert.assertTrue(monitor.doMonitor(emitter)); @@ -67,7 +67,7 @@ public void test_monitor_withNullCounts() @Test public void test_monitor_withEmptyCounts() { - final BrokerSegmentCountStatsMonitor monitor = new BrokerSegmentCountStatsMonitor(() -> Map.of()); + final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor(() -> Map.of()); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 237ba401240f..866d967eca36 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -75,7 +75,7 @@ import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; -import org.apache.druid.server.metrics.BrokerSegmentCountStatsProvider; +import org.apache.druid.server.metrics.BrokerSegmentStatsProvider; import org.apache.druid.server.metrics.QueryCountStatsProvider; import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.router.TieredBrokerConfig; @@ -142,7 +142,7 @@ protected List getModules() LifecycleModule.register(binder, MetadataSegmentView.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); binder.bind(QueryableDruidServer.Maker.class).to(DirectDruidClientFactory.class).in(LazySingleton.class); - binder.bind(BrokerSegmentCountStatsProvider.class).to(BrokerServerView.class); + binder.bind(BrokerSegmentStatsProvider.class).to(BrokerServerView.class); JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class); binder.install(new CacheModule()); From 98a358abec240f4570720e8380db865a68e8883a Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Mon, 28 Jul 2025 14:00:54 +0530 Subject: [PATCH 07/14] Add serverview metrics and remove the available segments one --- docs/operations/metrics.md | 3 + .../indexing/EmbeddedIndexTaskTest.java | 6 +- .../EmbeddedMariaDBMetadataStoreTest.java | 4 ++ .../minio/EmbeddedMinIOStorageTest.java | 4 ++ .../msq/EmbeddedMSQRealtimeQueryTest.java | 2 +- .../apache/druid/client/BrokerServerView.java | 49 +++++++++++++--- .../metrics/BrokerSegmentStatsMonitor.java | 3 +- .../metrics/BrokerSegmentStatsProvider.java | 9 ++- .../druid/client/BrokerServerViewTest.java | 8 +-- .../BrokerSegmentStatsMonitorTest.java | 58 ++++++++++++++++--- 10 files changed, 117 insertions(+), 29 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 8fc1cf341645..55e69c5dff57 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -93,6 +93,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`serverview/segment/added`|Number of segments added to the broker|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| +|`serverview/segment/removed`|Number of segments removed from the broker|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| + ### Historical |Metric|Description|Dimensions|Normal value| diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java index 197d0ab5fed4..25eae6ad33f0 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java @@ -86,7 +86,7 @@ public void test_runIndexTask_forInlineDatasource() cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task)); cluster.callApi().waitForTaskToSucceed(taskId, overlord); broker.latchableEmitter().waitForEventAggregate( - event -> event.hasMetricName("segment/available/count") + event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource), agg -> agg.hasSumAtLeast(10) ); @@ -114,9 +114,9 @@ public void test_runIndexTask_forInlineDatasource() agg -> agg.hasSumAtLeast(10) ); broker.latchableEmitter().waitForEventAggregate( - event -> event.hasMetricName("segment/available/count") + event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasSumAtLeast(10) + agg -> agg.hasSumAtLeast(30) ); Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM %s", dataSource)); Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)); diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java index e1a8b0ca160e..6a3f96b126df 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java @@ -19,6 +19,7 @@ package org.apache.druid.testing.embedded.mariadb; +import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; import org.apache.druid.testing.embedded.EmbeddedRouter; import org.apache.druid.testing.embedded.indexing.EmbeddedIndexTaskTest; @@ -31,6 +32,9 @@ public class EmbeddedMariaDBMetadataStoreTest extends EmbeddedIndexTaskTest @Override public EmbeddedDruidCluster createCluster() { + final EmbeddedBroker broker = new EmbeddedBroker() + .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]") + .addProperty("druid.monitoring.emissionPeriod", "PT0.1s"); return EmbeddedDruidCluster.withZookeeper() .useLatchableEmitter() .addResource(new MariaDBMetadataResource()) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java index 4bc5aa4737a8..b32d8ef18734 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java @@ -19,6 +19,7 @@ package org.apache.druid.testing.embedded.minio; +import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; import org.apache.druid.testing.embedded.EmbeddedRouter; import org.apache.druid.testing.embedded.indexing.EmbeddedIndexTaskTest; @@ -31,6 +32,9 @@ public class EmbeddedMinIOStorageTest extends EmbeddedIndexTaskTest @Override public EmbeddedDruidCluster createCluster() { + final EmbeddedBroker broker = new EmbeddedBroker() + .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]") + .addProperty("druid.monitoring.emissionPeriod", "PT0.1s"); return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() .addResource(new MinIOStorageResource()) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index 27bdf268525c..ac8f7bec31d5 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -179,7 +179,7 @@ void setUpEach() agg -> agg.hasSumAtLeast(totalRows) ); broker.latchableEmitter().waitForEvent( - event -> event.hasMetricName("segment/available/count") + event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource) .hasDimension(DruidMetrics.INTERVAL, "2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z") .hasValue(1) diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index c68648ae0706..c25d24278dc8 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -21,6 +21,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Ordering; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.client.selector.TierSelectorStrategy; @@ -79,7 +80,12 @@ public class BrokerServerView implements TimelineServerView, BrokerSegmentStatsP private final CountDownLatch initialized = new CountDownLatch(1); private final FilteredServerInventoryView baseView; private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; - private final ConcurrentHashMap segmentAvailableCount = new ConcurrentHashMap<>(); + private final ConcurrentHashMap totalSegmentAddCount = new ConcurrentHashMap<>(); + private final ConcurrentHashMap totalSegmentRemoveCount = new ConcurrentHashMap<>(); + @GuardedBy("totalSegmentAddCount") + private Map previousSegmentAddCount = new HashMap<>(); + @GuardedBy("totalSegmentRemoveCount") + private Map previousSegmentRemoveCount = new HashMap<>(); @Inject public BrokerServerView( @@ -444,23 +450,48 @@ public List getDruidServers() .collect(Collectors.toList()); } + private void incrementSegmentCount(RowKey key) + { + totalSegmentAddCount.compute(key, (k, currentValue) -> currentValue == null ? 1 : currentValue + 1); + } + @Override - public Map getAvailableSegmentCount() + public Map getSegmentAddedCount() { - return segmentAvailableCount; + final ConcurrentHashMap total = totalSegmentAddCount; + synchronized (totalSegmentAddCount) { + final Map delta = getDeltaValues(total, previousSegmentAddCount); + previousSegmentAddCount = total; + return delta; + } } - private void incrementSegmentCount(RowKey key) + @Override + public Map getSegmentRemovedCount() { - segmentAvailableCount.compute(key, (k, currentValue) -> currentValue == null ? 1 : currentValue + 1); + final ConcurrentHashMap total = totalSegmentRemoveCount; + synchronized (totalSegmentRemoveCount) { + final Map delta = getDeltaValues(total, previousSegmentRemoveCount); + previousSegmentRemoveCount = total; + return delta; + } } private void decrementSegmentCount(RowKey key) { - segmentAvailableCount.computeIfPresent(key, (k, currentValue) -> { - long newValue = currentValue - 1; - return newValue > 0 ? newValue : null; - }); + totalSegmentRemoveCount.compute(key, (k, currentValue) -> currentValue == null ? 1 : currentValue + 1); + } + + private Map getDeltaValues(Map total, Map prev) + { + final Map deltaValues = new HashMap<>(); + total.forEach( + (dataSource, totalCount) -> deltaValues.put( + dataSource, + totalCount - prev.getOrDefault(dataSource, 0L) + ) + ); + return deltaValues; } private static RowKey getMetricKey(final DataSegment segment) diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitor.java index 01713bd454d4..b48af0bccf89 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitor.java @@ -46,7 +46,8 @@ public BrokerSegmentStatsMonitor(BrokerSegmentStatsProvider statsProvider) @Override public boolean doMonitor(ServiceEmitter emitter) { - emit(emitter, "segment/available/count", statsProvider.getAvailableSegmentCount()); + emit(emitter, "serverview/segment/added", statsProvider.getSegmentAddedCount()); + emit(emitter, "serverview/segment/removed", statsProvider.getSegmentRemovedCount()); return true; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java index d2230090dc6d..3406196da93f 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java @@ -26,7 +26,12 @@ public interface BrokerSegmentStatsProvider { /** - * Return the number of segments queryable by a Broker for a datasource, interval and version. + * Return the number of segments recently added by a Broker for a datasource, interval and version. */ - Map getAvailableSegmentCount(); + Map getSegmentAddedCount(); + + /** + * Return the number of segments recently dropped by a Broker for a datasource, interval and version. + */ + Map getSegmentRemovedCount(); } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 8c2d8b16486e..f2aba71b5c39 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -120,8 +120,8 @@ public void testSingleServerAddedRemovedSegment() throws Exception announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - Map availableSegmentCount = brokerServerView.getAvailableSegmentCount(); - for (Map.Entry entry : availableSegmentCount.entrySet()) { + Map segmentAddedCount = brokerServerView.getSegmentAddedCount(); + for (Map.Entry entry : segmentAddedCount.entrySet()) { Assert.assertEquals( RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) .with(Dimension.INTERVAL, segment.getInterval().toString()) @@ -159,7 +159,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception 0, timeline.lookup(intervals).size() ); - Assert.assertEquals(0, brokerServerView.getAvailableSegmentCount().size()); + Assert.assertEquals(1L, brokerServerView.getSegmentRemovedCount().size()); Assert.assertNull(timeline.findChunk(intervals, "v1", partition)); } @@ -210,7 +210,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception ) ) ); - Map availableSegmentCount = brokerServerView.getAvailableSegmentCount(); + Map availableSegmentCount = brokerServerView.getSegmentAddedCount(); Map expectedSegmentCount = new HashMap<>(); for (DataSegment segment : segments) { expectedSegmentCount.put( diff --git a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java index 4669910cdcb5..598f592253e8 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java @@ -29,7 +29,6 @@ public class BrokerSegmentStatsMonitorTest { - private BrokerSegmentStatsProvider statsProvider; private static final RowKey SEGMENT_METRIC_KEY1 = RowKey.with(Dimension.DATASOURCE, "dataSource1") .with(Dimension.VERSION, "2024-01-01T00:00:00.000Z") .with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z") @@ -42,22 +41,49 @@ public class BrokerSegmentStatsMonitorTest @Test public void test_monitor() { - final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor( - () -> Map.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 5L) - ); + BrokerSegmentStatsProvider statsProvider = new BrokerSegmentStatsProvider() + { + @Override + public Map getSegmentAddedCount() + { + return Map.of(SEGMENT_METRIC_KEY1, 10L, SEGMENT_METRIC_KEY2, 5L); + } + + @Override + public Map getSegmentRemovedCount() + { + return Map.of(SEGMENT_METRIC_KEY1, 1L); + } + }; + final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor(statsProvider); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); - Assert.assertEquals(2, emitter.getNumEmittedEvents()); + Assert.assertEquals(3, emitter.getNumEmittedEvents()); - emitter.verifyValue("segment/available/count", Map.of("dataSource", "dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 10L); - emitter.verifyValue("segment/available/count", Map.of("dataSource", "dataSource2", "version", "2024-01-02T00:00:00.000Z", "interval", "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z"), 5L); + emitter.verifyValue("serverview/segment/added", Map.of("dataSource", "dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 10L); + emitter.verifyValue("serverview/segment/added", Map.of("dataSource", "dataSource2", "version", "2024-01-02T00:00:00.000Z", "interval", "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z"), 5L); + emitter.verifyValue("serverview/segment/removed", Map.of("dataSource", "dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 1L); } @Test public void test_monitor_withNullCounts() { - final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor(() -> null); + BrokerSegmentStatsProvider statsProvider = new BrokerSegmentStatsProvider() + { + @Override + public Map getSegmentAddedCount() + { + return null; + } + + @Override + public Map getSegmentRemovedCount() + { + return null; + } + }; + final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor(statsProvider); final StubServiceEmitter emitter = new StubServiceEmitter(); Assert.assertTrue(monitor.doMonitor(emitter)); @@ -67,7 +93,21 @@ public void test_monitor_withNullCounts() @Test public void test_monitor_withEmptyCounts() { - final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor(() -> Map.of()); + BrokerSegmentStatsProvider statsProvider = new BrokerSegmentStatsProvider() + { + @Override + public Map getSegmentAddedCount() + { + return Map.of(); + } + + @Override + public Map getSegmentRemovedCount() + { + return Map.of(); + } + }; + final BrokerSegmentStatsMonitor monitor = new BrokerSegmentStatsMonitor(statsProvider); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); From 1bbd7d44183572624df4ce0eae1a9673ef009d4a Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Mon, 28 Jul 2025 14:07:52 +0530 Subject: [PATCH 08/14] Docs oopsie --- docs/operations/metrics.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 55e69c5dff57..86afb0a2a071 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -92,7 +92,6 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| - |`serverview/segment/added`|Number of segments added to the broker|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| |`serverview/segment/removed`|Number of segments removed from the broker|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| From ab58000f9f8ce6aae3e2b5062959496c79185278 Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Mon, 28 Jul 2025 14:18:16 +0530 Subject: [PATCH 09/14] Fix the count based metric --- .../testing/embedded/indexing/EmbeddedIndexTaskTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java index 25eae6ad33f0..3a57ed90f8bc 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java @@ -88,7 +88,7 @@ public void test_runIndexTask_forInlineDatasource() broker.latchableEmitter().waitForEventAggregate( event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasSumAtLeast(10) + agg -> agg.hasCountAtLeast(10) ); // Verify that the task created 10 DAY-granularity segments final List segments = new ArrayList<>( @@ -116,7 +116,7 @@ public void test_runIndexTask_forInlineDatasource() broker.latchableEmitter().waitForEventAggregate( event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasSumAtLeast(30) + agg -> agg.hasCountAtLeast(10) ); Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM %s", dataSource)); Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)); From e3a48f31ebccd518eb1f54d1368ac16c24d1ad63 Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Mon, 28 Jul 2025 16:37:53 +0530 Subject: [PATCH 10/14] Fix metrics --- docs/operations/metrics.md | 4 ++-- .../embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java | 4 ---- .../testing/embedded/minio/EmbeddedMinIOStorageTest.java | 4 ---- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 86afb0a2a071..045606e5849a 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -75,6 +75,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`segment/schemaCache/poll/failed`|Number of failed coordinator polls to fetch datasource schema.||| |`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||| |`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| +|`serverview/segment/added`|Number of segments discovered and added by the Broker to its server view.|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| +|`serverview/segment/removed`|Number of segments removed by the Broker from its server view.|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| |`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| |`subquery/rows`|Number of rows materialized by the subquery's results. |`id`, `subqueryId`| Varies | |`subquery/bytes`|Number of bytes materialized by the subquery's results. This metric is only emitted if the query uses [byte-based subquery guardrails](https://druid.apache.org/docs/latest/configuration/#guardrails-for-materialization-of-subqueries) |`id`, `subqueryId` | Varies | @@ -92,8 +94,6 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| -|`serverview/segment/added`|Number of segments added to the broker|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| -|`serverview/segment/removed`|Number of segments removed from the broker|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies| ### Historical diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java index 6a3f96b126df..e1a8b0ca160e 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/mariadb/EmbeddedMariaDBMetadataStoreTest.java @@ -19,7 +19,6 @@ package org.apache.druid.testing.embedded.mariadb; -import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; import org.apache.druid.testing.embedded.EmbeddedRouter; import org.apache.druid.testing.embedded.indexing.EmbeddedIndexTaskTest; @@ -32,9 +31,6 @@ public class EmbeddedMariaDBMetadataStoreTest extends EmbeddedIndexTaskTest @Override public EmbeddedDruidCluster createCluster() { - final EmbeddedBroker broker = new EmbeddedBroker() - .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]") - .addProperty("druid.monitoring.emissionPeriod", "PT0.1s"); return EmbeddedDruidCluster.withZookeeper() .useLatchableEmitter() .addResource(new MariaDBMetadataResource()) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java index b32d8ef18734..4bc5aa4737a8 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/minio/EmbeddedMinIOStorageTest.java @@ -19,7 +19,6 @@ package org.apache.druid.testing.embedded.minio; -import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; import org.apache.druid.testing.embedded.EmbeddedRouter; import org.apache.druid.testing.embedded.indexing.EmbeddedIndexTaskTest; @@ -32,9 +31,6 @@ public class EmbeddedMinIOStorageTest extends EmbeddedIndexTaskTest @Override public EmbeddedDruidCluster createCluster() { - final EmbeddedBroker broker = new EmbeddedBroker() - .addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]") - .addProperty("druid.monitoring.emissionPeriod", "PT0.1s"); return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() .addResource(new MinIOStorageResource()) From 9d7d9e9de4921821dbfe87fde9650ebd5ed380ef Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Mon, 28 Jul 2025 21:06:03 +0530 Subject: [PATCH 11/14] Use individual blocking queues to emit and provide events --- .../apache/druid/client/BrokerServerView.java | 58 +++++-------------- .../server/coordinator/stats/Dimension.java | 5 +- .../metrics/BrokerSegmentStatsProvider.java | 4 +- .../druid/client/BrokerServerViewTest.java | 13 +++-- .../BrokerSegmentStatsMonitorTest.java | 14 ++--- 5 files changed, 32 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index c25d24278dc8..caaf9ec0d00d 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -21,7 +21,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.Ordering; -import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.client.selector.TierSelectorStrategy; @@ -56,6 +55,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Function; import java.util.stream.Collectors; @@ -80,12 +80,8 @@ public class BrokerServerView implements TimelineServerView, BrokerSegmentStatsP private final CountDownLatch initialized = new CountDownLatch(1); private final FilteredServerInventoryView baseView; private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; - private final ConcurrentHashMap totalSegmentAddCount = new ConcurrentHashMap<>(); - private final ConcurrentHashMap totalSegmentRemoveCount = new ConcurrentHashMap<>(); - @GuardedBy("totalSegmentAddCount") - private Map previousSegmentAddCount = new HashMap<>(); - @GuardedBy("totalSegmentRemoveCount") - private Map previousSegmentRemoveCount = new HashMap<>(); + private final LinkedBlockingQueue segmentAddEvents = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue segmentRemoveEvents = new LinkedBlockingQueue<>(); @Inject public BrokerServerView( @@ -292,7 +288,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); selectors.put(segmentId, selector); - incrementSegmentCount(getMetricKey(segment)); + segmentAddEvents.add(getMetricKey(segment, server)); } QueryableDruidServer queryableDruidServer = clients.get(server.getName()); @@ -369,7 +365,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen segment.getVersion() ); } else { - decrementSegmentCount(getMetricKey(segment)); + segmentRemoveEvents.add(getMetricKey(segment, server)); runTimelineCallbacks(callback -> callback.segmentRemoved(segment)); } } @@ -450,54 +446,30 @@ public List getDruidServers() .collect(Collectors.toList()); } - private void incrementSegmentCount(RowKey key) + private static RowKey getMetricKey(final DataSegment segment, DruidServerMetadata serverMetadata) { - totalSegmentAddCount.compute(key, (k, currentValue) -> currentValue == null ? 1 : currentValue + 1); + return RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) + .with(Dimension.SERVER, serverMetadata.getName()) + .and(Dimension.DESCRIPTION, segment.getId().toString()); } @Override public Map getSegmentAddedCount() { - final ConcurrentHashMap total = totalSegmentAddCount; - synchronized (totalSegmentAddCount) { - final Map delta = getDeltaValues(total, previousSegmentAddCount); - previousSegmentAddCount = total; - return delta; - } + return drainAndCollectEvents(segmentAddEvents); } @Override public Map getSegmentRemovedCount() { - final ConcurrentHashMap total = totalSegmentRemoveCount; - synchronized (totalSegmentRemoveCount) { - final Map delta = getDeltaValues(total, previousSegmentRemoveCount); - previousSegmentRemoveCount = total; - return delta; - } - } - - private void decrementSegmentCount(RowKey key) - { - totalSegmentRemoveCount.compute(key, (k, currentValue) -> currentValue == null ? 1 : currentValue + 1); + return drainAndCollectEvents(segmentRemoveEvents); } - private Map getDeltaValues(Map total, Map prev) + private Map drainAndCollectEvents(LinkedBlockingQueue eventQueue) { - final Map deltaValues = new HashMap<>(); - total.forEach( - (dataSource, totalCount) -> deltaValues.put( - dataSource, - totalCount - prev.getOrDefault(dataSource, 0L) - ) - ); - return deltaValues; + final List currentEvents = new ArrayList<>(); + eventQueue.drainTo(currentEvents); + return currentEvents.stream().collect(Collectors.groupingBy(e -> e, Collectors.counting())); } - private static RowKey getMetricKey(final DataSegment segment) - { - return RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) - .with(Dimension.INTERVAL, String.valueOf(segment.getInterval())) - .and(Dimension.VERSION, segment.getVersion()); - } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java index 9d119b88a2f1..0f15da2997c3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java @@ -30,10 +30,7 @@ public enum Dimension DUTY("duty"), DUTY_GROUP("dutyGroup"), DESCRIPTION("description"), - SERVER("server"), - INTERVAL("interval"), - VERSION("version"); - + SERVER("server"); private final String reportedName; Dimension(String name) diff --git a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java index 3406196da93f..de5c26f09c71 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/BrokerSegmentStatsProvider.java @@ -26,12 +26,12 @@ public interface BrokerSegmentStatsProvider { /** - * Return the number of segments recently added by a Broker for a datasource, interval and version. + * Return the number of segments recently added by a Broker for a datasource, service and description. */ Map getSegmentAddedCount(); /** - * Return the number of segments recently dropped by a Broker for a datasource, interval and version. + * Return the number of segments recently dropped by a Broker for a datasource, service and description. */ Map getSegmentRemovedCount(); } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index f2aba71b5c39..f5d741418a8b 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -124,8 +124,8 @@ public void testSingleServerAddedRemovedSegment() throws Exception for (Map.Entry entry : segmentAddedCount.entrySet()) { Assert.assertEquals( RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) - .with(Dimension.INTERVAL, segment.getInterval().toString()) - .with(Dimension.VERSION, segment.getVersion()) + .with(Dimension.DESCRIPTION, segment.getId().toString()) + .with(Dimension.SERVER, druidServer.getName()) .build(), entry.getKey() ); @@ -212,15 +212,16 @@ public void testMultipleServerAddedRemovedSegment() throws Exception ); Map availableSegmentCount = brokerServerView.getSegmentAddedCount(); Map expectedSegmentCount = new HashMap<>(); - for (DataSegment segment : segments) { + for (int i = 0; i < 5; ++i) { expectedSegmentCount.put( - RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) - .with(Dimension.INTERVAL, segment.getInterval().toString()) - .with(Dimension.VERSION, segment.getVersion()) + RowKey.with(Dimension.DATASOURCE, segments.get(i).getDataSource()) + .with(Dimension.DESCRIPTION, segments.get(i).getId().toString()) + .with(Dimension.SERVER, druidServers.get(i).getName()) .build(), 1L ); } + Assert.assertEquals(expectedSegmentCount, availableSegmentCount); // unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2") diff --git a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java index 598f592253e8..8badd767fe51 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/BrokerSegmentStatsMonitorTest.java @@ -30,12 +30,12 @@ public class BrokerSegmentStatsMonitorTest { private static final RowKey SEGMENT_METRIC_KEY1 = RowKey.with(Dimension.DATASOURCE, "dataSource1") - .with(Dimension.VERSION, "2024-01-01T00:00:00.000Z") - .with(Dimension.INTERVAL, "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z") + .with(Dimension.DESCRIPTION, "someSegmentID1") + .with(Dimension.SERVER, "serverName1") .build(); private static final RowKey SEGMENT_METRIC_KEY2 = RowKey.with(Dimension.DATASOURCE, "dataSource2") - .with(Dimension.VERSION, "2024-01-02T00:00:00.000Z") - .with(Dimension.INTERVAL, "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z") + .with(Dimension.DESCRIPTION, "someSegmentID2") + .with(Dimension.SERVER, "serverName2") .build(); @Test @@ -61,9 +61,9 @@ public Map getSegmentRemovedCount() Assert.assertEquals(3, emitter.getNumEmittedEvents()); - emitter.verifyValue("serverview/segment/added", Map.of("dataSource", "dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 10L); - emitter.verifyValue("serverview/segment/added", Map.of("dataSource", "dataSource2", "version", "2024-01-02T00:00:00.000Z", "interval", "2024-01-02T00:00:00.000Z/2024-01-03T00:00:00.000Z"), 5L); - emitter.verifyValue("serverview/segment/removed", Map.of("dataSource", "dataSource1", "version", "2024-01-01T00:00:00.000Z", "interval", "2024-01-01T00:00:00.000Z/2024-01-02T00:00:00.000Z"), 1L); + emitter.verifyValue("serverview/segment/added", Map.of("dataSource", "dataSource1", "description", "someSegmentID1", "server", "serverName1"), 10L); + emitter.verifyValue("serverview/segment/added", Map.of("dataSource", "dataSource2", "description", "someSegmentID2", "server", "serverName2"), 5L); + emitter.verifyValue("serverview/segment/removed", Map.of("dataSource", "dataSource1", "description", "someSegmentID1", "server", "serverName1"), 1L); } @Test From dfbfb10aa99d01740e9f049b666c54e46cecb893 Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Mon, 28 Jul 2025 21:26:03 +0530 Subject: [PATCH 12/14] Fix relatime query tests --- .../druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index ac8f7bec31d5..316945ee9c86 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -181,7 +181,6 @@ void setUpEach() broker.latchableEmitter().waitForEvent( event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource) - .hasDimension(DruidMetrics.INTERVAL, "2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z") .hasValue(1) ); } From 138c42374333c5ce0eeeb64b969eed75b4b2bdd2 Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Mon, 28 Jul 2025 22:13:19 +0530 Subject: [PATCH 13/14] Address review comments --- .../testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 1 - .../src/main/java/org/apache/druid/client/BrokerServerView.java | 2 +- .../org/apache/druid/server/coordinator/stats/Dimension.java | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index 316945ee9c86..91993bafac09 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -181,7 +181,6 @@ void setUpEach() broker.latchableEmitter().waitForEvent( event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource) - .hasValue(1) ); } diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index caaf9ec0d00d..d4df47e75029 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -446,7 +446,7 @@ public List getDruidServers() .collect(Collectors.toList()); } - private static RowKey getMetricKey(final DataSegment segment, DruidServerMetadata serverMetadata) + private static RowKey getMetricKey(final DataSegment segment, final DruidServerMetadata serverMetadata) { return RowKey.with(Dimension.DATASOURCE, segment.getDataSource()) .with(Dimension.SERVER, serverMetadata.getName()) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java index 0f15da2997c3..7001f7fa4550 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Dimension.java @@ -31,6 +31,7 @@ public enum Dimension DUTY_GROUP("dutyGroup"), DESCRIPTION("description"), SERVER("server"); + private final String reportedName; Dimension(String name) From de4f9822252be8a5d2b7170212eb5590012edbdc Mon Sep 17 00:00:00 2001 From: Uddeshya Singh Date: Tue, 29 Jul 2025 11:34:56 +0530 Subject: [PATCH 14/14] Run tests only when historical sends the segment --- .../druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 1 + .../src/main/java/org/apache/druid/query/DruidMetrics.java | 1 + 2 files changed, 2 insertions(+) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java index 91993bafac09..fee507c84a5c 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java @@ -181,6 +181,7 @@ void setUpEach() broker.latchableEmitter().waitForEvent( event -> event.hasMetricName("serverview/segment/added") .hasDimension(DruidMetrics.DATASOURCE, dataSource) + .hasDimension(DruidMetrics.SERVER, "localhost:8091") ); } diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index 923eb11656fb..3f4271dba83f 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -59,6 +59,7 @@ public class DruidMetrics public static final String CATEGORY = "category"; public static final String WORKER_VERSION = "workerVersion"; + public static final String SERVER = "server"; public static int findNumComplexAggs(List aggs) {