From 0658846ad7e0cf0ee569598524d778b246c5fbdd Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 12 Jun 2024 17:38:57 +0800 Subject: [PATCH 1/5] KAFKA-16938: Break circular reference between DynamicBrokerConfig and DynamicConfig --- .../scala/kafka/server/DynamicBrokerConfig.scala | 6 ++---- .../main/scala/kafka/server/DynamicConfig.scala | 14 +++++++++++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 22576bdceb6fd..a90a27cb0fd04 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -117,8 +117,6 @@ object DynamicBrokerConfig { AllDynamicConfigs.intersect(passwordConfigs) } - private val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- DynamicConfig.Broker.names.asScala - def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith) def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = { @@ -168,7 +166,7 @@ object DynamicBrokerConfig { } private def nonDynamicConfigs(props: Properties): Set[String] = { - props.asScala.keySet.intersect(nonDynamicProps) + props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps) } private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = { @@ -321,7 +319,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) { - val nonDynamic = configNames.intersect(nonDynamicProps) + val nonDynamic = configNames.intersect(DynamicConfig.Broker.nonDynamicProps) require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic") } diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index 955103bea6f8e..ef68c4b465456 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -17,6 +17,8 @@ package kafka.server +import kafka.server.DynamicBrokerConfig.AllDynamicConfigs + import java.net.{InetAddress, UnknownHostException} import java.util.Properties import org.apache.kafka.common.config.ConfigDef @@ -30,10 +32,16 @@ import scala.jdk.CollectionConverters._ * and can only be set dynamically. */ object DynamicConfig { + object Broker { + private val brokerConfigs = { + val configs = QuotaConfigs.brokerQuotaConfigs() + KafkaConfig.configKeys + .filter { case (configName, _) => AllDynamicConfigs.contains(configName) } + .foreach { case (_, config) => configs.define(config) } + configs + } - object Broker { - private val brokerConfigs = QuotaConfigs.brokerQuotaConfigs() - DynamicBrokerConfig.addDynamicConfigs(brokerConfigs) + val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- brokerConfigs.names.asScala def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys From 88ac773e7f0144c99249a4a501c56a5ab1b19a2c Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 12 Jun 2024 19:39:07 +0800 Subject: [PATCH 2/5] KAFKA-16938: Add comment and correct indentation --- core/src/main/scala/kafka/server/DynamicConfig.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index ef68c4b465456..18006f5974b0d 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -35,13 +35,17 @@ object DynamicConfig { object Broker { private val brokerConfigs = { val configs = QuotaConfigs.brokerQuotaConfigs() + + // Filter and define all dynamic configurations KafkaConfig.configKeys .filter { case (configName, _) => AllDynamicConfigs.contains(configName) } .foreach { case (_, config) => configs.define(config) } configs } - val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- brokerConfigs.names.asScala + // Non-dynamic properties are determined by subtracting dynamic broker config names from all config names. + // This is to avoid circular reference issues during initialization. + val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- brokerConfigs.names.asScala def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys From a59fa3f88445fac69c7666525e0f6c6069952eb5 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 12 Jun 2024 21:07:21 +0800 Subject: [PATCH 3/5] KAFKA-16938: Remove unused method addDynamicConfigs --- .../main/scala/kafka/server/DynamicBrokerConfig.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index a90a27cb0fd04..904d04fb503d7 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -182,16 +182,6 @@ object DynamicBrokerConfig { DynamicConfig.Broker.validate(baseProps) } - private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = { - KafkaConfig.configKeys.forKeyValue { (configName, config) => - if (AllDynamicConfigs.contains(configName)) { - configDef.define(config.name, config.`type`, config.defaultValue, config.validator, - config.importance, config.documentation, config.group, config.orderInGroup, config.width, - config.displayName, config.dependents, config.recommender) - } - } - } - private[server] def dynamicConfigUpdateModes: util.Map[String, String] = { AllDynamicConfigs.map { name => val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide" From e27c2254268aa4988654d8ed1ab133a74284235e Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 12 Jun 2024 23:20:30 +0800 Subject: [PATCH 4/5] KAFKA-16938: Rewrite comment to make it more precise --- core/src/main/scala/kafka/server/DynamicConfig.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index 18006f5974b0d..8a604962f5b46 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -43,8 +43,7 @@ object DynamicConfig { configs } - // Non-dynamic properties are determined by subtracting dynamic broker config names from all config names. - // This is to avoid circular reference issues during initialization. + // In order to avoid circular reference, all DynamicBrokerConfig's variables which are initialized by `DynamicConfig.Broker` should be moved to `DynamicConfig.Broker`. Otherwise, those variables of DynamicBrokerConfig will see intermediate state of `DynamicConfig.Broker`, because `brokerConfigs` is created by `DynamicBrokerConfig.AllDynamicConfigs` val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- brokerConfigs.names.asScala def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys From 40e992c9980f5b1342db4c75debcabea9650c931 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 12 Jun 2024 23:21:51 +0800 Subject: [PATCH 5/5] KAFKA-16938: Format comment --- core/src/main/scala/kafka/server/DynamicConfig.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index 8a604962f5b46..f17efd57435e7 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -43,7 +43,8 @@ object DynamicConfig { configs } - // In order to avoid circular reference, all DynamicBrokerConfig's variables which are initialized by `DynamicConfig.Broker` should be moved to `DynamicConfig.Broker`. Otherwise, those variables of DynamicBrokerConfig will see intermediate state of `DynamicConfig.Broker`, because `brokerConfigs` is created by `DynamicBrokerConfig.AllDynamicConfigs` + // In order to avoid circular reference, all DynamicBrokerConfig's variables which are initialized by `DynamicConfig.Broker` should be moved to `DynamicConfig.Broker`. + // Otherwise, those variables of DynamicBrokerConfig will see intermediate state of `DynamicConfig.Broker`, because `brokerConfigs` is created by `DynamicBrokerConfig.AllDynamicConfigs` val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- brokerConfigs.names.asScala def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys