From f99c139408ca6daffe7876b9ca540c61bbf82374 Mon Sep 17 00:00:00 2001 From: abhijeetk88 Date: Wed, 12 Jun 2024 12:02:19 +0530 Subject: [PATCH] KAFKA-17069: Remote log copy throttle metrics --- .../kafka/log/remote/RemoteLogManager.java | 8 +++++++- .../log/remote/RemoteLogManagerTest.java | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 2d589337e7170..8beb6971aeb82 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -175,6 +175,7 @@ public class RemoteLogManager implements Closeable { private final RLMQuotaManager rlmCopyQuotaManager; private final RLMQuotaManager rlmFetchQuotaManager; private final Sensor fetchThrottleTimeSensor; + private final Sensor copyThrottleTimeSensor; private final RemoteIndexCache indexCache; private final RemoteStorageThreadPool remoteStorageReaderThreadPool; @@ -235,6 +236,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, fetchThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-fetch-throttle-time", RemoteLogManager.class.getSimpleName(), "The %s time in millis remote fetches was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor(); + copyThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-copy-throttle-time", RemoteLogManager.class.getSimpleName(), + "The %s time in millis remote copies was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor(); indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); @@ -815,13 +818,16 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException copyQuotaManagerLock.lock(); try { - while (rlmCopyQuotaManager.getThrottleTimeMs() > 0) { + long throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs(); + while (throttleTimeMs > 0) { + copyThrottleTimeSensor.record(throttleTimeMs, time.milliseconds()); logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); // If the thread gets interrupted while waiting, the InterruptedException is thrown // back to the caller. It's important to note that the task being executed is already // cancelled before the executing thread is interrupted. The caller is responsible // for handling the exception gracefully by checking if the task is already cancelled. boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); + throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs(); } rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes()); // Signal waiting threads to check the quota again diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 8cf190785f5a4..c9c37de929b2b 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.ReplicaNotAvailableException; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.record.FileRecords; @@ -192,6 +193,8 @@ public class RemoteLogManagerTest { private final String remoteLogMetadataConsumerTestProp = REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test"; private final String remoteLogMetadataConsumerTestVal = "consumer.test"; private final String remoteLogMetadataTopicPartitionsNum = "1"; + private final long quotaExceededThrottleTime = 1000L; + private final long quotaAvailableThrottleTime = 0L; private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); @@ -536,6 +539,7 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset, when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))) .thenReturn(Optional.empty()); + when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime); // Verify the metrics for remote writes and for failures is zero before attempt to copy log segment assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count()); @@ -652,6 +656,7 @@ void testCustomMetadataSizeExceedsLimit() throws Exception { when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))) .thenReturn(Optional.of(customMetadata)); + when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime); RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, customMetadataSizeLimit); task.convertToLeader(2); @@ -879,6 +884,7 @@ void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS); return Optional.empty(); }).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime); Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition); List metadataList = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); @@ -1022,6 +1028,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception { latch.await(5000, TimeUnit.MILLISECONDS); return Optional.empty(); }).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime); Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition); @@ -1139,6 +1146,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception { dummyFuture.complete(null); when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); doThrow(new RuntimeException()).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime); // Verify the metrics for remote write requests/failures is zero before attempt to copy log segment assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count()); @@ -2860,6 +2868,12 @@ public void testCopyQuota(boolean quotaExceeded) throws Exception { // Verify that the copy operation times out, since no segments can be copied due to quota being exceeded assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + Map allMetrics = metrics.metrics(); + KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager")); + KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager")); + assertEquals(quotaExceededThrottleTime, ((Double) avgMetric.metricValue()).longValue()); + assertEquals(quotaExceededThrottleTime, ((Double) maxMetric.metricValue()).longValue()); + // Verify the highest offset in remote storage is updated only once ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); @@ -2874,6 +2888,12 @@ public void testCopyQuota(boolean quotaExceeded) throws Exception { // Verify bytes to copy was recorded with the quota manager verify(rlmCopyQuotaManager, times(1)).record(10); + Map allMetrics = metrics.metrics(); + KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager")); + KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager")); + assertEquals(Double.NaN, avgMetric.metricValue()); + assertEquals(Double.NaN, maxMetric.metricValue()); + // Verify the highest offset in remote storage is updated ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());