From 2258521169560325a46b96df4af337abcf8db605 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Tue, 4 Jun 2024 20:27:29 +0530 Subject: [PATCH 1/4] Support added to update the remote.fetch.max.wait.ms config dynamically. --- .../kafka/server/DynamicBrokerConfig.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 2 ++ .../scala/kafka/server/ReplicaManager.scala | 3 +- .../server/DynamicBrokerConfigTest.scala | 34 +++++++++++++++++++ 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index c9bb2e3b4ffda..22576bdceb6fd 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1203,6 +1203,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w object DynamicRemoteLogConfig { val ReconfigurableConfigs = Set( - RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP + RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, + RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP ) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 215f827e21646..9ae6d708944ce 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1211,6 +1211,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) + def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) + validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 17e9a5c1b9e79..6aecf0d04f9a9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,10 +1479,9 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() + val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.asInstanceOf[Long] val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) - delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) None } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 328d6e41b5722..e17c9e2901974 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -792,6 +792,40 @@ class DynamicBrokerConfigTest { verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100) } + @Test + def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val config = KafkaConfig(props) + val kafkaBroker = mock(classOf[KafkaBroker]) + when(kafkaBroker.config).thenReturn(config) + assertEquals(500, config.remoteFetchMaxWaitMs) + + val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) + + val newProps = new Properties() + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000") + // update default config + config.dynamicConfig.validate(newProps, perBrokerConfig = false) + config.dynamicConfig.updateDefaultConfig(newProps) + assertEquals(30000, config.remoteFetchMaxWaitMs) + + // update per broker config + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000") + config.dynamicConfig.validate(newProps, perBrokerConfig = true) + config.dynamicConfig.updateBrokerConfig(0, newProps) + assertEquals(10000, config.remoteFetchMaxWaitMs) + + // invalid value 1 + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "-1") + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) + // invalid value 2 + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "0") + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + } + @Test def testUpdateDynamicRemoteLogManagerConfig(): Unit = { val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) From e4243bc2f5a4d344c6a6a87ee67776aed06b70e3 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Thu, 6 Jun 2024 23:24:08 +0530 Subject: [PATCH 2/4] resolve conflicts after rebase --- core/src/main/scala/kafka/server/ReplicaManager.scala | 2 +- .../server/log/remote/storage/RemoteLogManagerConfig.java | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6aecf0d04f9a9..6ab67a84ec426 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,7 +1479,7 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.asInstanceOf[Long] + val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 8224ab2382549..c54819142197a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -471,10 +471,6 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); } - public int remoteFetchMaxWaitMs() { - return getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); - } - public static void main(String[] args) { System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config)); } From 6c09a41e8c227bb6e6339f82f9ec0c64a74b922b Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Thu, 6 Jun 2024 23:39:05 +0530 Subject: [PATCH 3/4] update comments --- .../scala/unit/kafka/server/DynamicBrokerConfigTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index e17c9e2901974..734934926b8bf 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -817,11 +817,11 @@ class DynamicBrokerConfigTest { config.dynamicConfig.updateBrokerConfig(0, newProps) assertEquals(10000, config.remoteFetchMaxWaitMs) - // invalid value 1 + // invalid value "-1" newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "-1") assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) - // invalid value 2 + // invalid value "0" newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "0") assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) } From ee86d3462f23a115663f537f4044d7f856556654 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Fri, 7 Jun 2024 11:49:46 +0530 Subject: [PATCH 4/4] addressed the review comments --- .../unit/kafka/server/DynamicBrokerConfigTest.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 734934926b8bf..2e5d77cdc6fae 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -817,13 +817,12 @@ class DynamicBrokerConfigTest { config.dynamicConfig.updateBrokerConfig(0, newProps) assertEquals(10000, config.remoteFetchMaxWaitMs) - // invalid value "-1" - newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "-1") - assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) - assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) - // invalid value "0" - newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "0") - assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + // invalid values + for (maxWaitMs <- Seq(-1, 0)) { + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, maxWaitMs.toString) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) + } } @Test