From 6c9c13c8c8cc476170229d02aabce4bd3bcd67bf Mon Sep 17 00:00:00 2001 From: Nandini Singhal Date: Fri, 5 Dec 2025 14:50:44 +0100 Subject: [PATCH] KAFKA-19969: Add Per-Topic Observability Metrics for Remote Fetch Quota Enforcement Remote fetch quota enforcement lacks per-topic visibility, making it difficult to identify which topics consume the most quota or experience throttling. Add 4 new per-topic metrics: - RemoteFetchQuotaBytesPerSec: quota bytes consumed per topic - RemoteFetchQuotaThrottledPerSec: throttled requests per topic - RemoteFetchQuotaThrottleTimeMs: throttle time distribution - RemoteFetchSizeBytes: fetch size distribution --- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../log/remote/quota/RLMQuotaManager.java | 48 +++++++++- .../log/remote/storage/RemoteLogManager.java | 8 +- .../log/remote/storage/RemoteLogReader.java | 20 ++++- .../log/metrics/BrokerTopicMetrics.java | 90 ++++++++++++++++++- .../remote/storage/RemoteLogReaderTest.java | 12 ++- 6 files changed, 166 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4ee24f2e41428..9d70236fb507f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1925,7 +1925,7 @@ class ReplicaManager(val config: KafkaConfig, createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset, new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage")) } else { - val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs + val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMsWithTopic(tp.topicPartition.topic()) val fetchDataInfo = if (throttleTimeMs > 0) { // Record the throttle time for the remote log fetches remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, time.milliseconds()) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaManager.java index 37119d7c4b784..e2ebb1cf0643d 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaManager.java @@ -28,6 +28,7 @@ import org.apache.kafka.server.quota.QuotaType; import org.apache.kafka.server.quota.QuotaUtils; import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,17 +45,23 @@ public class RLMQuotaManager { private final QuotaType quotaType; private final String description; private final Time time; + private final BrokerTopicStats brokerTopicStats; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final SensorAccess sensorAccess; private Quota quota; public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { + this(config, metrics, quotaType, description, time, null); + } + + public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time, BrokerTopicStats brokerTopicStats) { this.config = config; this.metrics = metrics; this.quotaType = quotaType; this.description = description; this.time = time; + this.brokerTopicStats = brokerTopicStats; this.quota = new Quota(config.quotaBytesPerSecond(), true); this.sensorAccess = new SensorAccess(lock, metrics); @@ -78,13 +85,36 @@ public void updateQuota(Quota newQuota) { } public long getThrottleTimeMs() { + return getThrottleTimeMsWithTopic(null); + } + + public long getThrottleTimeMsWithTopic(String topic) { Sensor sensorInstance = sensor(); try { sensorInstance.checkQuotas(); } catch (QuotaViolationException qve) { + long throttleTimeMs = QuotaUtils.throttleTime(qve, time.milliseconds()); + LOGGER.debug("Quota violated for sensor ({}), metric: ({}), metric-value: ({}), bound: ({})", sensorInstance.name(), qve.metric().metricName(), qve.value(), qve.bound()); - return QuotaUtils.throttleTime(qve, time.milliseconds()); + + // Record throttling metrics if topic provided + try { + if (brokerTopicStats != null && topic != null) { + var throttledMeter = brokerTopicStats.topicStats(topic).remoteFetchQuotaThrottledRate(); + if (throttledMeter != null) { + throttledMeter.mark(); + } + var throttleTimeHistogram = brokerTopicStats.topicStats(topic).remoteFetchQuotaThrottleTimeMs(); + if (throttleTimeHistogram != null) { + throttleTimeHistogram.update(throttleTimeMs); + } + } + } catch (Exception e) { + LOGGER.warn("Failed to record throttling metrics for topic {}: {}", topic, e.getMessage()); + } + + return throttleTimeMs; } return 0L; } @@ -93,6 +123,22 @@ public void record(double value) { sensor().record(value, time.milliseconds(), false); } + /** + * Record quota usage with topic context for observability + */ + public void recordWithTopic(String topic, double value) { + // Record global quota + sensor().record(value, time.milliseconds(), false); + + // Record per-topic quota usage if brokerTopicStats available + if (brokerTopicStats != null && value > 0) { + var meter = brokerTopicStats.topicStats(topic).remoteFetchQuotaBytesRate(); + if (meter != null) { + meter.mark((long) value); + } + } + } + private MetricConfig getQuotaMetricConfig(Quota quota) { return new MetricConfig() .timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index a9848633effa3..89dea13f2085f 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -337,18 +337,22 @@ Duration quotaTimeout() { RLMQuotaManager createRLMCopyQuotaManager() { return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLM_COPY, - "Tracking copy byte-rate for Remote Log Manager", time); + "Tracking copy byte-rate for Remote Log Manager", time, brokerTopicStats); } RLMQuotaManager createRLMFetchQuotaManager() { return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLM_FETCH, - "Tracking fetch byte-rate for Remote Log Manager", time); + "Tracking fetch byte-rate for Remote Log Manager", time, brokerTopicStats); } public long getFetchThrottleTimeMs() { return rlmFetchQuotaManager.getThrottleTimeMs(); } + public long getFetchThrottleTimeMsWithTopic(String topic) { + return rlmFetchQuotaManager.getThrottleTimeMsWithTopic(topic); + } + public Sensor fetchThrottleTimeSensor() { return fetchQuotaMetrics.sensor(); } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java index afcba812e1ca2..86252075f1b14 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java @@ -60,22 +60,34 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, @Override public Void call() { RemoteLogReadResult result; + String topic = fetchInfo.topicIdPartition().topic(); + try { LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition()); FetchDataInfo fetchDataInfo = remoteReadTimer.time(() -> rlm.read(fetchInfo)); - brokerTopicStats.topicStats(fetchInfo.topicIdPartition().topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes()); - brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes()); + int fetchedBytes = fetchDataInfo.records.sizeInBytes(); + + brokerTopicStats.topicStats(topic).remoteFetchBytesRate().mark(fetchedBytes); + brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchedBytes); + + // Record fetch size distribution for quota analysis + var fetchSizeHistogram = brokerTopicStats.topicStats(topic).remoteFetchSizeBytes(); + if (fetchSizeHistogram != null) { + fetchSizeHistogram.update(fetchedBytes); + } + result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()); } catch (OffsetOutOfRangeException e) { result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); } catch (Exception e) { - brokerTopicStats.topicStats(fetchInfo.topicIdPartition().topic()).failedRemoteFetchRequestRate().mark(); + brokerTopicStats.topicStats(topic).failedRemoteFetchRequestRate().mark(); brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark(); LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicIdPartition(), e); result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); } + LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition()); - quotaManager.record(result.fetchDataInfo().map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0)); + quotaManager.recordWithTopic(topic, result.fetchDataInfo().map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0)); callback.accept(result); return null; } diff --git a/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java b/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java index 08ba069cc3c43..47237ef2d5fad 100644 --- a/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java +++ b/storage/src/main/java/org/apache/kafka/storage/log/metrics/BrokerTopicMetrics.java @@ -48,6 +48,11 @@ public final class BrokerTopicMetrics { public static final String FAILED_SHARE_FETCH_REQUESTS_PER_SEC = "FailedShareFetchRequestsPerSec"; public static final String TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC = "TotalShareAcknowledgementRequestsPerSec"; public static final String FAILED_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC = "FailedShareAcknowledgementRequestsPerSec"; + // Quota observability metrics + public static final String REMOTE_FETCH_QUOTA_BYTES_PER_SEC = "RemoteFetchQuotaBytesPerSec"; + public static final String REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC = "RemoteFetchQuotaThrottledPerSec"; + public static final String REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS = "RemoteFetchQuotaThrottleTimeMs"; + public static final String REMOTE_FETCH_SIZE_BYTES = "RemoteFetchSizeBytes"; // These following topics are for LogValidator for better debugging on failed records public static final String NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC = "NoKeyCompactedTopicRecordsPerSec"; public static final String INVALID_MAGIC_NUMBER_RECORDS_PER_SEC = "InvalidMagicNumberRecordsPerSec"; @@ -60,6 +65,7 @@ public final class BrokerTopicMetrics { private final Map tags; private final Map metricTypeMap = new java.util.HashMap<>(); private final Map metricGaugeTypeMap = new java.util.HashMap<>(); + private final Map metricHistogramTypeMap = new java.util.HashMap<>(); public BrokerTopicMetrics(boolean remoteStorageEnabled) { this(Optional.empty(), remoteStorageEnabled); @@ -117,19 +123,34 @@ private BrokerTopicMetrics(Optional name, boolean remoteStorageEnabled) metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName())); metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName())); metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName())); + + // Quota observability metrics + metricTypeMap.put(REMOTE_FETCH_QUOTA_BYTES_PER_SEC, new MeterWrapper(REMOTE_FETCH_QUOTA_BYTES_PER_SEC, "bytes")); + metricTypeMap.put(REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC, new MeterWrapper(REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC, "requests")); + metricHistogramTypeMap.put(REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS, new HistogramWrapper(REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS)); + metricHistogramTypeMap.put(REMOTE_FETCH_SIZE_BYTES, new HistogramWrapper(REMOTE_FETCH_SIZE_BYTES)); } } public void closeMetric(String metricName) { MeterWrapper mw = metricTypeMap.get(metricName); - if (mw != null) mw.close(); + if (mw != null) { + mw.close(); + } GaugeWrapper mg = metricGaugeTypeMap.get(metricName); - if (mg != null) mg.close(); + if (mg != null) { + mg.close(); + } + HistogramWrapper hg = metricHistogramTypeMap.get(metricName); + if (hg != null) { + hg.close(); + } } public void close() { metricTypeMap.values().forEach(MeterWrapper::close); metricGaugeTypeMap.values().forEach(GaugeWrapper::close); + metricHistogramTypeMap.values().forEach(HistogramWrapper::close); } // used for testing only @@ -345,6 +366,25 @@ public Meter failedBuildRemoteLogAuxStateRate() { return metricTypeMap.get(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName()).meter(); } + // Quota observability metrics + public Meter remoteFetchQuotaBytesRate() { + MeterWrapper wrapper = metricTypeMap.get(REMOTE_FETCH_QUOTA_BYTES_PER_SEC); + return wrapper != null ? wrapper.meter() : null; + } + + public Meter remoteFetchQuotaThrottledRate() { + MeterWrapper wrapper = metricTypeMap.get(REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC); + return wrapper != null ? wrapper.meter() : null; + } + + public HistogramWrapper remoteFetchQuotaThrottleTimeMs() { + return metricHistogramTypeMap.get(REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS); + } + + public HistogramWrapper remoteFetchSizeBytes() { + return metricHistogramTypeMap.get(REMOTE_FETCH_SIZE_BYTES); + } + private class MeterWrapper { private final String metricType; private final String eventType; @@ -425,4 +465,50 @@ private void newGaugeIfNeed() { metricsGroup.newGauge(metricType, () -> metricValues.values().stream().mapToLong(v -> v).sum(), tags); } } + + public class HistogramWrapper { + private final String metricType; + private volatile com.yammer.metrics.core.Histogram lazyHistogram; + private final Lock histogramLock = new ReentrantLock(); + + public HistogramWrapper(String metricType) { + this.metricType = metricType; + if (tags.isEmpty()) { + histogram(); + } + } + + public com.yammer.metrics.core.Histogram histogram() { + com.yammer.metrics.core.Histogram histogram = lazyHistogram; + if (histogram == null) { + histogramLock.lock(); + try { + histogram = lazyHistogram; + if (histogram == null) { + histogram = metricsGroup.newHistogram(metricType, true, tags); + lazyHistogram = histogram; + } + } finally { + histogramLock.unlock(); + } + } + return histogram; + } + + public void update(long value) { + histogram().update(value); + } + + public void close() { + histogramLock.lock(); + try { + if (lazyHistogram != null) { + metricsGroup.removeMetric(metricType, tags); + lazyHistogram = null; + } + } finally { + histogramLock.unlock(); + } + } + } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java index ce80183d58d65..fd7d8b8537186 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java @@ -83,9 +83,11 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE assertTrue(actualRemoteLogReadResult.fetchDataInfo().isPresent()); assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo().get()); - // verify the record method on quota manager was called with the expected value + // verify the recordWithTopic method on quota manager was called with the expected value + ArgumentCaptor topicArg = ArgumentCaptor.forClass(String.class); ArgumentCaptor recordedArg = ArgumentCaptor.forClass(Double.class); - verify(mockQuotaManager, times(1)).record(recordedArg.capture()); + verify(mockQuotaManager, times(1)).recordWithTopic(topicArg.capture(), recordedArg.capture()); + assertEquals(TOPIC, topicArg.getValue()); assertEquals(100, recordedArg.getValue()); // Verify metrics for remote reads are updated correctly @@ -115,9 +117,11 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce assertTrue(actualRemoteLogReadResult.error().isPresent()); assertFalse(actualRemoteLogReadResult.fetchDataInfo().isPresent()); - // verify the record method on quota manager was called with the expected value + // verify the recordWithTopic method on quota manager was called with the expected value + ArgumentCaptor topicArg = ArgumentCaptor.forClass(String.class); ArgumentCaptor recordedArg = ArgumentCaptor.forClass(Double.class); - verify(mockQuotaManager, times(1)).record(recordedArg.capture()); + verify(mockQuotaManager, times(1)).recordWithTopic(topicArg.capture(), recordedArg.capture()); + assertEquals(TOPIC, topicArg.getValue()); assertEquals(0, recordedArg.getValue()); // Verify metrics for remote reads are updated correctly