Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon
|`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`||
|`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`||
|`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`||
|`segment/schemaCache/rowSignature/changed`|Emitted when the cached row signature on the Broker's segment metadata cache for a datasource changes, indicating schema evolution or some form of flapping.|`dataSource`||
|`segment/schemaCache/rowSignature/column/count`|Number of columns in the row signature on the Broker's segment metadata cache for a datasource when it's initialized or updated.|`dataSource`||
|`segment/schemaCache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments in the cluster.|
|`segment/schemaCache/used/count`|Number of published used segments for which schema is cached.||Depends on the number of segments in the cluster.|
|`segment/schemaCache/usedFingerprint/count`|Number of unique schema fingerprints cached for published used segments.||Depends on the number of distinct schema in the cluster.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class Metric
public static final String REFRESH_SKIPPED_TOMBSTONES = PREFIX + "refresh/tombstone/count";
public static final String REFRESH_DURATION_MILLIS = PREFIX + "refresh/time";
public static final String DATASOURCE_REMOVED = PREFIX + "dataSource/removed";
public static final String SCHEMA_ROW_SIGNATURE_CHANGED = PREFIX + "rowSignature/changed";
public static final String SCHEMA_ROW_SIGNATURE_COLUMN_COUNT = PREFIX + "rowSignature/column/count";

/**
* Number of used cold segments in the metadata store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,33 @@ private Map<String, PhysicalDatasourceMetadata> queryDataSourceInformation(Set<S
private void updateDSMetadata(String dataSource, PhysicalDatasourceMetadata physicalDatasourceMetadata)
{
final PhysicalDatasourceMetadata oldTable = tables.put(dataSource, physicalDatasourceMetadata);
if (oldTable == null || !oldTable.getRowSignature().equals(physicalDatasourceMetadata.getRowSignature())) {
log.info("[%s] has new signature: %s.", dataSource, physicalDatasourceMetadata.getRowSignature());
final RowSignature newRowSignature = physicalDatasourceMetadata.getRowSignature();
final int newColumnCount = newRowSignature.getColumnNames().size();

final ServiceMetricEvent.Builder builder =
new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource);

if (oldTable == null) {
log.info(
"Row signature for datasource[%s] initialized with [%d] columns - signature[%s]",
dataSource, newColumnCount, newRowSignature
);

emitMetric(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, newColumnCount, builder);
return;
}

final RowSignature oldRowSignature = oldTable.getRowSignature();
if (!oldRowSignature.equals(newRowSignature)) {
log.info(
"Row signature for datasource[%s] updated from [%d] columns to [%d] columns - new signature[%s]",
dataSource, oldRowSignature.getColumnNames().size(), newColumnCount, newRowSignature
);

emitMetric(Metric.SCHEMA_ROW_SIGNATURE_CHANGED, 1, builder);
emitMetric(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, newColumnCount, builder);
} else {
log.debug("[%s] signature is unchanged.", dataSource);
log.debug("Row signature for datasource[%s] is unchanged.", dataSource);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.NoopEscalator;
Expand Down Expand Up @@ -117,6 +116,7 @@ public class BrokerSegmentMetadataCacheTest extends BrokerSegmentMetadataCacheTe
private static final int WAIT_TIMEOUT_SECS = 6;
private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
private BrokerSegmentMetadataCache runningSchema;
private StubServiceEmitter emitter = new StubServiceEmitter();
private CountDownLatch buildTableLatch = new CountDownLatch(1);
private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
private CountDownLatch refreshLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -156,7 +156,7 @@ public BrokerSegmentMetadataCache buildSchemaMarkAndTableLatch(
config,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
coordinatorClient,
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -192,7 +192,7 @@ public BrokerSegmentMetadataCache buildSchemaMarkAndRefreshLatch() throws Interr
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -260,7 +260,7 @@ public ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
coordinatorClient,
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -340,7 +340,7 @@ public ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
coordinatorClient,
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -373,7 +373,7 @@ public void testBrokerPollsAllDSSchema() throws InterruptedException
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
coordinatorClient,
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -409,7 +409,7 @@ public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled() throws In
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
config
Expand All @@ -431,6 +431,10 @@ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToR

refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);

emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L);
emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L);
emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L);

Assert.assertEquals(0, refreshLatch.getCount());
}

Expand All @@ -449,7 +453,7 @@ public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled() throws I
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -726,7 +730,7 @@ public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, I
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -825,6 +829,8 @@ public void refresh(

Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));

emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L);

Set<String> dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
dataSources.remove("foo2");

Expand All @@ -843,6 +849,9 @@ public void refresh(

fooDs = schema.getDatasource("foo");

emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_CHANGED, Map.of(DruidMetrics.DATASOURCE, "foo"), 1L);
emitter.verifySum(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 13L);

// check if the new column present in the added segment is present in the datasource schema
// ensuring that the schema is rebuilt
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals));
Expand Down Expand Up @@ -987,6 +996,11 @@ public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throw
refreshLatch = new CountDownLatch(1);
Assert.assertTrue(refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));

emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo"), 6L);
emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "foo2"), 3L);
emitter.verifyValue(Metric.SCHEMA_ROW_SIGNATURE_COLUMN_COUNT, Map.of(DruidMetrics.DATASOURCE, "some_datasource"), 9L);


fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
Assert.assertTrue(fooTable.dataSource() instanceof TableDataSource);
Expand Down Expand Up @@ -1043,7 +1057,7 @@ public void testRunSegmentMetadataQueryWithContext() throws Exception
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
internalQueryConfig,
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
Expand Down Expand Up @@ -1108,7 +1122,6 @@ public void testRefreshShouldEmitMetrics() throws InterruptedException, IOExcept
{
String dataSource = "xyz";
CountDownLatch addSegmentLatch = new CountDownLatch(2);
StubServiceEmitter emitter = new StubServiceEmitter("broker", "host");
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
Expand Down Expand Up @@ -1204,7 +1217,7 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
internalQueryConfig,
new NoopServiceEmitter(),
emitter,
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
Expand Down
Loading