Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,7 @@ The following table lists the monitors that are available and the services you c
|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. |MiddleManager, Indexer|
|`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat for the service.|Any|
|`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report metrics for groupBy queries like disk and merge buffer utilization. |Broker, Historical, Indexer, Peon|
|`org.apache.druid.server.metrics.BrokerSegmentStatsMonitor`| Report the number of segments of a datasource currently queryable by a Broker.|Broker|
Comment thread
uds5501 marked this conversation as resolved.

For example, if you only wanted monitors on all services for system and JVM information, you'd add the following to `common.runtime.properties`:

Expand Down
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`segment/schemaCache/poll/failed`|Number of failed coordinator polls to fetch datasource schema.|||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|||
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/segment/added`|Number of segments discovered and added by the Broker to its server view.|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies|
|`serverview/segment/removed`|Number of segments removed by the Broker from its server view.|This metric is only available if the `BrokerSegmentStatsMonitor` module is included.|Varies|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`subquery/rows`|Number of rows materialized by the subquery's results. |`id`, `subqueryId`| Varies |
|`subquery/bytes`|Number of bytes materialized by the subquery's results. This metric is only emitted if the query uses [byte-based subquery guardrails](https://druid.apache.org/docs/latest/configuration/#guardrails-for-materialization-of-subqueries) |`id`, `subqueryId` | Varies |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
*/
public class EmbeddedIndexTaskTest extends EmbeddedClusterTestBase
{
protected final EmbeddedBroker broker = new EmbeddedBroker();
protected final EmbeddedBroker broker = new EmbeddedBroker()
.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]")
.addProperty("druid.monitoring.emissionPeriod", "PT0.1s");
protected final EmbeddedIndexer indexer = new EmbeddedIndexer().addProperty("druid.worker.capacity", "25");
protected final EmbeddedOverlord overlord = new EmbeddedOverlord();
protected final EmbeddedHistorical historical = new EmbeddedHistorical();
Expand Down Expand Up @@ -83,7 +85,11 @@ public void test_runIndexTask_forInlineDatasource()

cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
cluster.callApi().waitForTaskToSucceed(taskId, overlord);

broker.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("serverview/segment/added")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasCountAtLeast(10)
);
// Verify that the task created 10 DAY-granularity segments
final List<DataSegment> segments = new ArrayList<>(
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource, null)
Expand All @@ -107,8 +113,10 @@ public void test_runIndexTask_forInlineDatasource()
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasSumAtLeast(10)
);
broker.latchableEmitter().waitForEvent(
event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
broker.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("serverview/segment/added")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasCountAtLeast(10)
);
Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM %s", dataSource));
Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public EmbeddedDruidCluster createCluster()
.addProperty("druid.manager.segments.pollDuration", "PT0.1s");

broker.addProperty("druid.msq.dart.controller.heapFraction", "0.9")
.addProperty("druid.query.default.context.maxConcurrentStages", "1");
.addProperty("druid.query.default.context.maxConcurrentStages", "1")
.addProperty("druid.monitoring.emissionPeriod", "PT0.1s")
.addProperty("druid.monitoring.monitors", "[\"org.apache.druid.server.metrics.BrokerSegmentStatsMonitor\"]");

historical.addProperty("druid.msq.dart.worker.heapFraction", "0.9")
.addProperty("druid.msq.dart.worker.concurrentQueries", "1")
Expand Down Expand Up @@ -177,7 +179,9 @@ void setUpEach()
agg -> agg.hasSumAtLeast(totalRows)
);
broker.latchableEmitter().waitForEvent(
event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
event -> event.hasMetricName("serverview/segment/added")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
Comment thread
uds5501 marked this conversation as resolved.
.hasDimension(DruidMetrics.SERVER, "localhost:8091")
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class DruidMetrics

public static final String CATEGORY = "category";
public static final String WORKER_VERSION = "workerVersion";
public static final String SERVER = "server";

public static int findNumComplexAggs(List<AggregatorFactory> aggs)
{
Expand Down
40 changes: 38 additions & 2 deletions server/src/main/java/org/apache/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.metrics.BrokerSegmentStatsProvider;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
Expand All @@ -52,14 +55,15 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
*
*/
@ManageLifecycle
public class BrokerServerView implements TimelineServerView
public class BrokerServerView implements TimelineServerView, BrokerSegmentStatsProvider
{
private static final Logger log = new Logger(BrokerServerView.class);

Expand All @@ -76,6 +80,8 @@ public class BrokerServerView implements TimelineServerView
private final CountDownLatch initialized = new CountDownLatch(1);
private final FilteredServerInventoryView baseView;
private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig;
private final LinkedBlockingQueue<RowKey> segmentAddEvents = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<RowKey> segmentRemoveEvents = new LinkedBlockingQueue<>();

@Inject
public BrokerServerView(
Expand Down Expand Up @@ -159,7 +165,8 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)

baseView.registerServerCallback(
exec,
new ServerCallback() {
new ServerCallback()
{
@Override
public CallbackAction serverAdded(DruidServer server)
{
Expand Down Expand Up @@ -281,6 +288,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm

timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
selectors.put(segmentId, selector);
segmentAddEvents.add(getMetricKey(segment, server));
}

QueryableDruidServer queryableDruidServer = clients.get(server.getName());
Expand Down Expand Up @@ -357,6 +365,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen
segment.getVersion()
);
} else {
segmentRemoveEvents.add(getMetricKey(segment, server));
runTimelineCallbacks(callback -> callback.segmentRemoved(segment));
}
}
Expand Down Expand Up @@ -436,4 +445,31 @@ public List<ImmutableDruidServer> getDruidServers()
.map(queryableDruidServer -> queryableDruidServer.getServer().toImmutableDruidServer())
.collect(Collectors.toList());
}

private static RowKey getMetricKey(final DataSegment segment, final DruidServerMetadata serverMetadata)
{
return RowKey.with(Dimension.DATASOURCE, segment.getDataSource())
.with(Dimension.SERVER, serverMetadata.getName())
.and(Dimension.DESCRIPTION, segment.getId().toString());
}

@Override
public Map<RowKey, Long> getSegmentAddedCount()
{
return drainAndCollectEvents(segmentAddEvents);
}

@Override
public Map<RowKey, Long> getSegmentRemovedCount()
{
return drainAndCollectEvents(segmentRemoveEvents);
}

private Map<RowKey, Long> drainAndCollectEvents(LinkedBlockingQueue<RowKey> eventQueue)
{
final List<RowKey> currentEvents = new ArrayList<>();
eventQueue.drainTo(currentEvents);
return currentEvents.stream().collect(Collectors.groupingBy(e -> e, Collectors.counting()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.metrics;

import com.google.inject.Inject;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.server.coordinator.stats.RowKey;

import java.util.Map;

/**
* Monitor that tracks the number of segments of a datasource currently queryable by this Broker.
*/
@LoadScope(roles = {NodeRole.BROKER_JSON_NAME})
public class BrokerSegmentStatsMonitor extends AbstractMonitor
{
private final BrokerSegmentStatsProvider statsProvider;

@Inject
public BrokerSegmentStatsMonitor(BrokerSegmentStatsProvider statsProvider)
{
this.statsProvider = statsProvider;
}

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
emit(emitter, "serverview/segment/added", statsProvider.getSegmentAddedCount());
emit(emitter, "serverview/segment/removed", statsProvider.getSegmentRemovedCount());
return true;
}

private void emit(ServiceEmitter emitter, String key, Map<RowKey, Long> counts)
{
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
if (counts != null) {
counts.forEach((k, v) -> {
k.getValues().forEach((dim, value) -> builder.setDimension(dim.reportedName(), value));
emitter.emit(builder.setMetric(key, v));
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.metrics;

import org.apache.druid.server.coordinator.stats.RowKey;

import java.util.Map;

public interface BrokerSegmentStatsProvider
{
/**
* Return the number of segments recently added by a Broker for a datasource, service and description.
*/
Map<RowKey, Long> getSegmentAddedCount();

/**
* Return the number of segments recently dropped by a Broker for a datasource, service and description.
*/
Map<RowKey, Long> getSegmentRemovedCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.TestCoordinatorClient;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
Expand All @@ -64,7 +66,9 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -116,6 +120,17 @@ public void testSingleServerAddedRemovedSegment() throws Exception
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
Map<RowKey, Long> segmentAddedCount = brokerServerView.getSegmentAddedCount();
for (Map.Entry<RowKey, Long> entry : segmentAddedCount.entrySet()) {
Assert.assertEquals(
RowKey.with(Dimension.DATASOURCE, segment.getDataSource())
.with(Dimension.DESCRIPTION, segment.getId().toString())
.with(Dimension.SERVER, druidServer.getName())
.build(),
entry.getKey()
);
Assert.assertEquals(1L, (long) entry.getValue());
}

TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
new TableDataSource("test_broker_server_view")
Expand Down Expand Up @@ -144,6 +159,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception
0,
timeline.lookup(intervals).size()
);
Assert.assertEquals(1L, brokerServerView.getSegmentRemovedCount().size());
Assert.assertNull(timeline.findChunk(intervals, "v1", partition));
}

Expand Down Expand Up @@ -194,6 +210,19 @@ public void testMultipleServerAddedRemovedSegment() throws Exception
)
)
);
Map<RowKey, Long> availableSegmentCount = brokerServerView.getSegmentAddedCount();
Map<RowKey, Long> expectedSegmentCount = new HashMap<>();
for (int i = 0; i < 5; ++i) {
expectedSegmentCount.put(
RowKey.with(Dimension.DATASOURCE, segments.get(i).getDataSource())
.with(Dimension.DESCRIPTION, segments.get(i).getId().toString())
.with(Dimension.SERVER, druidServers.get(i).getName())
.build(),
1L
);
}

Assert.assertEquals(expectedSegmentCount, availableSegmentCount);

// unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2")
unannounceSegmentForServer(druidServers.get(2), segments.get(2), zkPathsConfig);
Expand Down
Loading
Loading