Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon
|`serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`metadatacache/init/time`|Time taken to initialize the coordinator segment metadata cache.||Depends on the number of segments.|
|`segment/schemaCache/refresh/count`|Number of segments for which schema was refreshed in coordinator segment schema cache.|`dataSource`||
|`segment/schemaCache/refreshSkipped/count`|Number of segments for which schema refresh was skipped due to presence of segment metadata in datasource polled from coordinator.|`dataSource`||
|`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`||
|`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`||
|`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ protected String runTask(TaskBuilder<?, ?, ?, ?> taskBuilder)
{
final String taskId = IdUtils.getRandomId();
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
boolean useCentralizedSchema = Boolean.parseBoolean(cluster.getCommonProperties().getProperty("druid.centralizedDatasourceSchema.enabled", "false"));
if (useCentralizedSchema) {
cluster.callApi().waitForAllSegmentsToBeAvailableWithCentralizedSchema(dataSource, coordinator, broker);
} else {
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
}

return taskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.testing.embedded.compact.CompactionSparseColumnTest;
import org.apache.druid.testing.embedded.compact.CompactionTaskTest;
import org.apache.druid.testing.embedded.indexing.KafkaDataFormatsTest;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;

/**
Expand All @@ -46,7 +45,6 @@ private static EmbeddedDruidCluster configureCluster(EmbeddedDruidCluster cluste
}

@Nested
@Disabled("Disabled due to issues with compaction task not publishing schema to broker")
public class CompactionSparseColumn extends CompactionSparseColumnTest
{
@Override
Expand All @@ -57,7 +55,6 @@ protected EmbeddedDruidCluster createCluster()
}

@Nested
@Disabled("Disabled due to issues with compaction task not publishing schema to broker")
public class CompactionTask extends CompactionTaskTest
{
@Override
Expand All @@ -68,7 +65,6 @@ protected EmbeddedDruidCluster createCluster()
}

@Nested
@Disabled("Disabled due to issues with compaction task not publishing schema to broker")
public class KafkaDataFormats extends KafkaDataFormatsTest
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class Metric
// Broker-side metrics
public static final String BROKER_POLL_DURATION_MILLIS = PREFIX + "poll/time";
public static final String BROKER_POLL_FAILED = PREFIX + "poll/failed";
public static final String BROKER_SEGMENTS_SKIPPED_REFRESH = PREFIX + "refreshSkipped/count";

// Schema refresh metrics
public static final String STARTUP_DURATION_MILLIS = "metadatacache/init/time";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,29 @@ public void waitForAllSegmentsToBeAvailable(String dataSource, EmbeddedCoordinat
);
}

/**
* Waits for all used segments (including overshadowed) of the given datasource
* to be queryable by Brokers when centralized schema is enabled.
*/
public void waitForAllSegmentsToBeAvailableWithCentralizedSchema(
String dataSource,
EmbeddedCoordinator coordinator,
EmbeddedBroker broker
)
{
final int numSegments = coordinator
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.INCLUDING_OVERSHADOWED)
.size();

broker.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("segment/schemaCache/refreshSkipped/count")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasSumAtLeast(numSegments)
);
}

/**
* Returns a {@link Closeable} that deletes all the data for the given datasource
* on {@link Closeable#close()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
// update datasource metadata in the cache
polledDataSourceMetadata.forEach(this::updateDSMetadata);

emitMetricForSkippedSegments(segmentsToRefresh, polledDataSourceMetadata);
// Remove segments of the datasource from refresh list for which we received schema from the Coordinator.
segmentsToRefresh.removeIf(segmentId -> polledDataSourceMetadata.containsKey(segmentId.getDataSource()));

Expand Down Expand Up @@ -277,6 +278,29 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
}
}

private void emitMetricForSkippedSegments(
Set<SegmentId> segmentsToRefresh,
Map<String, PhysicalDatasourceMetadata> polledDataSourceMetadata
)
{
final Map<String, Integer> datasourceToNumSegmentsSkipped = new HashMap<>();

for (SegmentId segmentId : segmentsToRefresh) {
if (polledDataSourceMetadata.containsKey(segmentId.getDataSource())) {
datasourceToNumSegmentsSkipped.merge(segmentId.getDataSource(), 1, Integer::sum);
}
}

datasourceToNumSegmentsSkipped.forEach(
(dataSource, count) ->
emitMetric(
Metric.BROKER_SEGMENTS_SKIPPED_REFRESH,
count,
new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource)
)
);
}

@Override
protected void removeSegmentAction(SegmentId segmentId)
{
Expand Down