diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 22576bdceb6fd..904d04fb503d7 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -117,8 +117,6 @@ object DynamicBrokerConfig { AllDynamicConfigs.intersect(passwordConfigs) } - private val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- DynamicConfig.Broker.names.asScala - def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith) def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = { @@ -168,7 +166,7 @@ object DynamicBrokerConfig { } private def nonDynamicConfigs(props: Properties): Set[String] = { - props.asScala.keySet.intersect(nonDynamicProps) + props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps) } private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = { @@ -184,16 +182,6 @@ object DynamicBrokerConfig { DynamicConfig.Broker.validate(baseProps) } - private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = { - KafkaConfig.configKeys.forKeyValue { (configName, config) => - if (AllDynamicConfigs.contains(configName)) { - configDef.define(config.name, config.`type`, config.defaultValue, config.validator, - config.importance, config.documentation, config.group, config.orderInGroup, config.width, - config.displayName, config.dependents, config.recommender) - } - } - } - private[server] def dynamicConfigUpdateModes: util.Map[String, String] = { AllDynamicConfigs.map { name => val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide" @@ -321,7 +309,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) { - val nonDynamic = configNames.intersect(nonDynamicProps) + val nonDynamic = configNames.intersect(DynamicConfig.Broker.nonDynamicProps) require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic") } diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index 955103bea6f8e..f17efd57435e7 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -17,6 +17,8 @@ package kafka.server +import kafka.server.DynamicBrokerConfig.AllDynamicConfigs + import java.net.{InetAddress, UnknownHostException} import java.util.Properties import org.apache.kafka.common.config.ConfigDef @@ -30,10 +32,20 @@ import scala.jdk.CollectionConverters._ * and can only be set dynamically. */ object DynamicConfig { + object Broker { + private val brokerConfigs = { + val configs = QuotaConfigs.brokerQuotaConfigs() + + // Filter and define all dynamic configurations + KafkaConfig.configKeys + .filter { case (configName, _) => AllDynamicConfigs.contains(configName) } + .foreach { case (_, config) => configs.define(config) } + configs + } - object Broker { - private val brokerConfigs = QuotaConfigs.brokerQuotaConfigs() - DynamicBrokerConfig.addDynamicConfigs(brokerConfigs) + // In order to avoid circular reference, all DynamicBrokerConfig's variables which are initialized by `DynamicConfig.Broker` should be moved to `DynamicConfig.Broker`. + // Otherwise, those variables of DynamicBrokerConfig will see intermediate state of `DynamicConfig.Broker`, because `brokerConfigs` is created by `DynamicBrokerConfig.AllDynamicConfigs` + val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- brokerConfigs.names.asScala def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys