Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,7 @@ class ReplicaManager(val config: KafkaConfig,
createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
} else {
val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs
val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMsWithTopic(tp.topicPartition.topic())
val fetchDataInfo = if (throttleTimeMs > 0) {
// Record the throttle time for the remote log fetches
remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, time.milliseconds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.server.quota.QuotaUtils;
import org.apache.kafka.server.quota.SensorAccess;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,17 +45,23 @@ public class RLMQuotaManager {
private final QuotaType quotaType;
private final String description;
private final Time time;
private final BrokerTopicStats brokerTopicStats;

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final SensorAccess sensorAccess;
private Quota quota;

public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) {
this(config, metrics, quotaType, description, time, null);
}

public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time, BrokerTopicStats brokerTopicStats) {
this.config = config;
this.metrics = metrics;
this.quotaType = quotaType;
this.description = description;
this.time = time;
this.brokerTopicStats = brokerTopicStats;

this.quota = new Quota(config.quotaBytesPerSecond(), true);
this.sensorAccess = new SensorAccess(lock, metrics);
Expand All @@ -78,13 +85,36 @@ public void updateQuota(Quota newQuota) {
}

public long getThrottleTimeMs() {
return getThrottleTimeMsWithTopic(null);
}

public long getThrottleTimeMsWithTopic(String topic) {
Sensor sensorInstance = sensor();
try {
sensorInstance.checkQuotas();
} catch (QuotaViolationException qve) {
long throttleTimeMs = QuotaUtils.throttleTime(qve, time.milliseconds());

LOGGER.debug("Quota violated for sensor ({}), metric: ({}), metric-value: ({}), bound: ({})",
sensorInstance.name(), qve.metric().metricName(), qve.value(), qve.bound());
return QuotaUtils.throttleTime(qve, time.milliseconds());

// Record throttling metrics if topic provided
try {
if (brokerTopicStats != null && topic != null) {
var throttledMeter = brokerTopicStats.topicStats(topic).remoteFetchQuotaThrottledRate();
if (throttledMeter != null) {
throttledMeter.mark();
}
var throttleTimeHistogram = brokerTopicStats.topicStats(topic).remoteFetchQuotaThrottleTimeMs();
if (throttleTimeHistogram != null) {
throttleTimeHistogram.update(throttleTimeMs);
}
}
} catch (Exception e) {
LOGGER.warn("Failed to record throttling metrics for topic {}: {}", topic, e.getMessage());
}

return throttleTimeMs;
}
return 0L;
}
Expand All @@ -93,6 +123,22 @@ public void record(double value) {
sensor().record(value, time.milliseconds(), false);
}

/**
* Record quota usage with topic context for observability
*/
public void recordWithTopic(String topic, double value) {
// Record global quota
sensor().record(value, time.milliseconds(), false);

// Record per-topic quota usage if brokerTopicStats available
if (brokerTopicStats != null && value > 0) {
var meter = brokerTopicStats.topicStats(topic).remoteFetchQuotaBytesRate();
if (meter != null) {
meter.mark((long) value);
}
}
}

private MetricConfig getQuotaMetricConfig(Quota quota) {
return new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds(), TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,18 +337,22 @@ Duration quotaTimeout() {

RLMQuotaManager createRLMCopyQuotaManager() {
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLM_COPY,
"Tracking copy byte-rate for Remote Log Manager", time);
"Tracking copy byte-rate for Remote Log Manager", time, brokerTopicStats);
}

RLMQuotaManager createRLMFetchQuotaManager() {
return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLM_FETCH,
"Tracking fetch byte-rate for Remote Log Manager", time);
"Tracking fetch byte-rate for Remote Log Manager", time, brokerTopicStats);
}

public long getFetchThrottleTimeMs() {
return rlmFetchQuotaManager.getThrottleTimeMs();
}

public long getFetchThrottleTimeMsWithTopic(String topic) {
return rlmFetchQuotaManager.getThrottleTimeMsWithTopic(topic);
}

public Sensor fetchThrottleTimeSensor() {
return fetchQuotaMetrics.sensor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,34 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
@Override
public Void call() {
RemoteLogReadResult result;
String topic = fetchInfo.topicIdPartition().topic();

try {
LOGGER.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition());
FetchDataInfo fetchDataInfo = remoteReadTimer.time(() -> rlm.read(fetchInfo));
brokerTopicStats.topicStats(fetchInfo.topicIdPartition().topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
int fetchedBytes = fetchDataInfo.records.sizeInBytes();

brokerTopicStats.topicStats(topic).remoteFetchBytesRate().mark(fetchedBytes);
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchedBytes);

// Record fetch size distribution for quota analysis
var fetchSizeHistogram = brokerTopicStats.topicStats(topic).remoteFetchSizeBytes();
if (fetchSizeHistogram != null) {
fetchSizeHistogram.update(fetchedBytes);
}

result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
} catch (OffsetOutOfRangeException e) {
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
} catch (Exception e) {
brokerTopicStats.topicStats(fetchInfo.topicIdPartition().topic()).failedRemoteFetchRequestRate().mark();
brokerTopicStats.topicStats(topic).failedRemoteFetchRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark();
LOGGER.error("Error occurred while reading the remote data for {}", fetchInfo.topicIdPartition(), e);
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
}

LOGGER.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicIdPartition());
quotaManager.record(result.fetchDataInfo().map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0));
quotaManager.recordWithTopic(topic, result.fetchDataInfo().map(fetchDataInfo -> fetchDataInfo.records.sizeInBytes()).orElse(0));
callback.accept(result);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public final class BrokerTopicMetrics {
public static final String FAILED_SHARE_FETCH_REQUESTS_PER_SEC = "FailedShareFetchRequestsPerSec";
public static final String TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC = "TotalShareAcknowledgementRequestsPerSec";
public static final String FAILED_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC = "FailedShareAcknowledgementRequestsPerSec";
// Quota observability metrics
public static final String REMOTE_FETCH_QUOTA_BYTES_PER_SEC = "RemoteFetchQuotaBytesPerSec";
public static final String REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC = "RemoteFetchQuotaThrottledPerSec";
public static final String REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS = "RemoteFetchQuotaThrottleTimeMs";
public static final String REMOTE_FETCH_SIZE_BYTES = "RemoteFetchSizeBytes";
// These following topics are for LogValidator for better debugging on failed records
public static final String NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC = "NoKeyCompactedTopicRecordsPerSec";
public static final String INVALID_MAGIC_NUMBER_RECORDS_PER_SEC = "InvalidMagicNumberRecordsPerSec";
Expand All @@ -60,6 +65,7 @@ public final class BrokerTopicMetrics {
private final Map<String, String> tags;
private final Map<String, MeterWrapper> metricTypeMap = new java.util.HashMap<>();
private final Map<String, GaugeWrapper> metricGaugeTypeMap = new java.util.HashMap<>();
private final Map<String, HistogramWrapper> metricHistogramTypeMap = new java.util.HashMap<>();

public BrokerTopicMetrics(boolean remoteStorageEnabled) {
this(Optional.empty(), remoteStorageEnabled);
Expand Down Expand Up @@ -117,19 +123,34 @@ private BrokerTopicMetrics(Optional<String> name, boolean remoteStorageEnabled)
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName()));
metricGaugeTypeMap.put(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName(), new GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName()));

// Quota observability metrics
metricTypeMap.put(REMOTE_FETCH_QUOTA_BYTES_PER_SEC, new MeterWrapper(REMOTE_FETCH_QUOTA_BYTES_PER_SEC, "bytes"));
metricTypeMap.put(REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC, new MeterWrapper(REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC, "requests"));
metricHistogramTypeMap.put(REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS, new HistogramWrapper(REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS));
metricHistogramTypeMap.put(REMOTE_FETCH_SIZE_BYTES, new HistogramWrapper(REMOTE_FETCH_SIZE_BYTES));
}
}

public void closeMetric(String metricName) {
MeterWrapper mw = metricTypeMap.get(metricName);
if (mw != null) mw.close();
if (mw != null) {
mw.close();
}
GaugeWrapper mg = metricGaugeTypeMap.get(metricName);
if (mg != null) mg.close();
if (mg != null) {
mg.close();
}
HistogramWrapper hg = metricHistogramTypeMap.get(metricName);
if (hg != null) {
hg.close();
}
}

public void close() {
metricTypeMap.values().forEach(MeterWrapper::close);
metricGaugeTypeMap.values().forEach(GaugeWrapper::close);
metricHistogramTypeMap.values().forEach(HistogramWrapper::close);
}

// used for testing only
Expand Down Expand Up @@ -345,6 +366,25 @@ public Meter failedBuildRemoteLogAuxStateRate() {
return metricTypeMap.get(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName()).meter();
}

// Quota observability metrics
public Meter remoteFetchQuotaBytesRate() {
MeterWrapper wrapper = metricTypeMap.get(REMOTE_FETCH_QUOTA_BYTES_PER_SEC);
return wrapper != null ? wrapper.meter() : null;
}

public Meter remoteFetchQuotaThrottledRate() {
MeterWrapper wrapper = metricTypeMap.get(REMOTE_FETCH_QUOTA_THROTTLED_PER_SEC);
return wrapper != null ? wrapper.meter() : null;
}

public HistogramWrapper remoteFetchQuotaThrottleTimeMs() {
return metricHistogramTypeMap.get(REMOTE_FETCH_QUOTA_THROTTLE_TIME_MS);
}

public HistogramWrapper remoteFetchSizeBytes() {
return metricHistogramTypeMap.get(REMOTE_FETCH_SIZE_BYTES);
}

private class MeterWrapper {
private final String metricType;
private final String eventType;
Expand Down Expand Up @@ -425,4 +465,50 @@ private void newGaugeIfNeed() {
metricsGroup.newGauge(metricType, () -> metricValues.values().stream().mapToLong(v -> v).sum(), tags);
}
}

public class HistogramWrapper {
private final String metricType;
private volatile com.yammer.metrics.core.Histogram lazyHistogram;
private final Lock histogramLock = new ReentrantLock();

public HistogramWrapper(String metricType) {
this.metricType = metricType;
if (tags.isEmpty()) {
histogram();
}
}

public com.yammer.metrics.core.Histogram histogram() {
com.yammer.metrics.core.Histogram histogram = lazyHistogram;
if (histogram == null) {
histogramLock.lock();
try {
histogram = lazyHistogram;
if (histogram == null) {
histogram = metricsGroup.newHistogram(metricType, true, tags);
lazyHistogram = histogram;
}
} finally {
histogramLock.unlock();
}
}
return histogram;
}

public void update(long value) {
histogram().update(value);
}

public void close() {
histogramLock.lock();
try {
if (lazyHistogram != null) {
metricsGroup.removeMetric(metricType, tags);
lazyHistogram = null;
}
} finally {
histogramLock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE
assertTrue(actualRemoteLogReadResult.fetchDataInfo().isPresent());
assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo().get());

// verify the record method on quota manager was called with the expected value
// verify the recordWithTopic method on quota manager was called with the expected value
ArgumentCaptor<String> topicArg = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Double> recordedArg = ArgumentCaptor.forClass(Double.class);
verify(mockQuotaManager, times(1)).record(recordedArg.capture());
verify(mockQuotaManager, times(1)).recordWithTopic(topicArg.capture(), recordedArg.capture());
assertEquals(TOPIC, topicArg.getValue());
assertEquals(100, recordedArg.getValue());

// Verify metrics for remote reads are updated correctly
Expand Down Expand Up @@ -115,9 +117,11 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce
assertTrue(actualRemoteLogReadResult.error().isPresent());
assertFalse(actualRemoteLogReadResult.fetchDataInfo().isPresent());

// verify the record method on quota manager was called with the expected value
// verify the recordWithTopic method on quota manager was called with the expected value
ArgumentCaptor<String> topicArg = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Double> recordedArg = ArgumentCaptor.forClass(Double.class);
verify(mockQuotaManager, times(1)).record(recordedArg.capture());
verify(mockQuotaManager, times(1)).recordWithTopic(topicArg.capture(), recordedArg.capture());
assertEquals(TOPIC, topicArg.getValue());
assertEquals(0, recordedArg.getValue());

// Verify metrics for remote reads are updated correctly
Expand Down