Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class RemoteLogManager implements Closeable {
private final RLMQuotaManager rlmCopyQuotaManager;
private final RLMQuotaManager rlmFetchQuotaManager;
private final Sensor fetchThrottleTimeSensor;
private final Sensor copyThrottleTimeSensor;

private final RemoteIndexCache indexCache;
private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
Expand Down Expand Up @@ -235,6 +236,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,

fetchThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-fetch-throttle-time", RemoteLogManager.class.getSimpleName(),
"The %s time in millis remote fetches was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor();
copyThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-copy-throttle-time", RemoteLogManager.class.getSimpleName(),
"The %s time in millis remote copies was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor();

indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
Expand Down Expand Up @@ -815,13 +818,16 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException

copyQuotaManagerLock.lock();
try {
while (rlmCopyQuotaManager.getThrottleTimeMs() > 0) {
long throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs();
while (throttleTimeMs > 0) {
copyThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available.");
// If the thread gets interrupted while waiting, the InterruptedException is thrown
// back to the caller. It's important to note that the task being executed is already
// cancelled before the executing thread is interrupted. The caller is responsible
// for handling the exception gracefully by checking if the task is already cancelled.
boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS);
throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs();
}
rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes());
// Signal waiting threads to check the quota again
Expand Down
20 changes: 20 additions & 0 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.FileRecords;
Expand Down Expand Up @@ -192,6 +193,8 @@ public class RemoteLogManagerTest {
private final String remoteLogMetadataConsumerTestProp = REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
private final String remoteLogMetadataConsumerTestVal = "consumer.test";
private final String remoteLogMetadataTopicPartitionsNum = "1";
private final long quotaExceededThrottleTime = 1000L;
private final long quotaAvailableThrottleTime = 0L;

private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
Expand Down Expand Up @@ -536,6 +539,7 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenReturn(Optional.empty());
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);

// Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
Expand Down Expand Up @@ -652,6 +656,7 @@ void testCustomMetadataSizeExceedsLimit() throws Exception {
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenReturn(Optional.of(customMetadata));
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
task.convertToLeader(2);
Expand Down Expand Up @@ -879,6 +884,7 @@ void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws
copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS);
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);

Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
List<RemoteLogSegmentMetadata> metadataList = listRemoteLogSegmentMetadata(leaderTopicIdPartition, segmentCount, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
Expand Down Expand Up @@ -1022,6 +1028,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception {
latch.await(5000, TimeUnit.MILLISECONDS);
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);

Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);

Expand Down Expand Up @@ -1139,6 +1146,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
doThrow(new RuntimeException()).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);

// Verify the metrics for remote write requests/failures is zero before attempt to copy log segment
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
Expand Down Expand Up @@ -2860,6 +2868,12 @@ public void testCopyQuota(boolean quotaExceeded) throws Exception {
// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));

Map<org.apache.kafka.common.MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager"));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager"));
assertEquals(quotaExceededThrottleTime, ((Double) avgMetric.metricValue()).longValue());
assertEquals(quotaExceededThrottleTime, ((Double) maxMetric.metricValue()).longValue());

// Verify the highest offset in remote storage is updated only once
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
Expand All @@ -2874,6 +2888,12 @@ public void testCopyQuota(boolean quotaExceeded) throws Exception {
// Verify bytes to copy was recorded with the quota manager
verify(rlmCopyQuotaManager, times(1)).record(10);

Map<org.apache.kafka.common.MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager"));
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager"));
assertEquals(Double.NaN, avgMetric.metricValue());
assertEquals(Double.NaN, maxMetric.metricValue());

// Verify the highest offset in remote storage is updated
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
Expand Down