From 5916bea2eedc4c29427529fac11cb395ad416d65 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 14 Jan 2025 17:31:24 +0800 Subject: [PATCH 1/2] KAFAK-18451: Flaky RemoteLogManagerTest#testRLMOpsWhenMetadataIsNotReady Signed-off-by: PoAn Yang --- .../log/remote/RemoteLogManagerTest.java | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 166e58f92175d..c0ad3c7042d0b 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -3682,8 +3682,42 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } @Test - public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(2); + public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException, IOException { + // Recreate a remoteLogManager with default REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP (default value is 30000). + // The value in setup function is 100 which is too small. If the case can't run two verifyNoMoreInteractions in + // 100ms, the test will fail. + remoteLogManager.close(); + clearInvocations(remoteLogMetadataManager, remoteStorageManager); + Properties props = brokerConfig; + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "30000"); + appendRLMConfig(props); + config = KafkaConfig.fromProps(props); + + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> currentLogStartOffset.set(offset), + brokerTopicStats, metrics) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } + @Override + long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { + return 0L; + } + }; + doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class)); + + CountDownLatch latch = new CountDownLatch(3); // there are 3 RLMTasks, so setting the count to 3 when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class))) .thenAnswer(ans -> { latch.countDown(); From 17102e1b7212339ec27c793ad1fcd3af4adaecc6 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 14 Jan 2025 17:49:28 +0800 Subject: [PATCH 2/2] address comment Signed-off-by: PoAn Yang --- core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index c0ad3c7042d0b..b956c12380694 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -3715,7 +3715,6 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { return 0L; } }; - doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class)); CountDownLatch latch = new CountDownLatch(3); // there are 3 RLMTasks, so setting the count to 3 when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class)))