Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ object DynamicBrokerConfig {

private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)

val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
private val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r

private val DynamicPasswordConfigs = {
val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
AllDynamicConfigs.intersect(passwordConfigs)
}

private val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet -- DynamicConfig.Broker.names.asScala

def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith)

def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
Expand Down Expand Up @@ -166,7 +168,7 @@ object DynamicBrokerConfig {
}

private def nonDynamicConfigs(props: Properties): Set[String] = {
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
props.asScala.keySet.intersect(nonDynamicProps)
}

private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
Expand Down Expand Up @@ -319,7 +321,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}

private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains)
val nonDynamic = configNames.intersect(nonDynamicProps)
require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic")
}

Expand Down Expand Up @@ -674,11 +676,10 @@ trait BrokerReconfigurable {
object DynamicLogConfig {
// Exclude message.format.version for now since we need to check that the version
// is supported on all brokers in the cluster.
@nowarn("cat=deprecation")
val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this variable? Maybe we can remove it by following change.

  // Exclude message.format.version for now since we need to check that the version
  // is supported on all brokers in the cluster.
  @nowarn("cat=deprecation")
  val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've inlined this variable but we need to keep it as a Set for the - operator to work

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't run it actually, but IIRC "-" operator should work. See following screenshot

image

Please correct me for my Scala education 😄

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, you are correct! I was trying to use the -- operator, hence why it needed both operands to be Sets. With - we don't need the Set. Thanks


val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet -- ExcludedConfigs
val KafkaConfigToLogConfigName = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
val ReconfigurableConfigs: Set[String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
val KafkaConfigToLogConfigName: Map[String, String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
}

class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging {
Expand Down
41 changes: 17 additions & 24 deletions core/src/main/scala/kafka/server/DynamicConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package kafka.server
import java.net.{InetAddress, UnknownHostException}
import java.util.Properties
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._
import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}

import java.util
Expand All @@ -35,48 +32,44 @@ import scala.jdk.CollectionConverters._
object DynamicConfig {

object Broker {
// Definitions
val brokerConfigDef = new ConfigDef()
// Round minimum value down, to make it easier for users.
.define(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, LONG, QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, atLeast(0), MEDIUM, QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC)
DynamicBrokerConfig.addDynamicConfigs(brokerConfigDef)
val nonDynamicProps = KafkaConfig.configNames.toSet -- brokerConfigDef.names.asScala

def names = brokerConfigDef.names

def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
private val brokerConfigs = QuotaConfigs.brokerQuotaConfigs()
DynamicBrokerConfig.addDynamicConfigs(brokerConfigs)

def configKeys: util.Map[String, ConfigDef.ConfigKey] = brokerConfigs.configKeys

def names: util.Set[String] = brokerConfigs.names

def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(brokerConfigs, props, customPropsAllowed = true)
}

object Client {
private val clientConfigs = QuotaConfigs.userAndClientQuotaConfigs()

def configKeys = clientConfigs.configKeys
def configKeys: util.Map[String, ConfigDef.ConfigKey] = clientConfigs.configKeys

def names = clientConfigs.names
def names: util.Set[String] = clientConfigs.names

def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
}

object User {
private val userConfigs = QuotaConfigs.scramMechanismsPlusUserAndClientQuotaConfigs()

def configKeys = userConfigs.configKeys
def configKeys: util.Map[String, ConfigDef.ConfigKey] = userConfigs.configKeys

def names = userConfigs.names
def names: util.Set[String] = userConfigs.names

def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
}

object Ip {
private val ipConfigs = QuotaConfigs.ipConfigs()

def configKeys = ipConfigs.configKeys
def configKeys: util.Map[String, ConfigDef.ConfigKey] = ipConfigs.configKeys

def names = ipConfigs.names
def names: util.Set[String] = ipConfigs.names

def validate(props: Properties) = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)
def validate(props: Properties): util.Map[String, AnyRef] = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)

def isValidIpEntity(ip: String): Boolean = {
if (ip != ZooKeeperInternals.DEFAULT_STRING) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ object KafkaConfig {
if (configType != null) {
Some(configType)
} else {
val configKey = DynamicConfig.Broker.brokerConfigDef.configKeys().get(exactName)
val configKey = DynamicConfig.Broker.configKeys.get(exactName)
if (configKey != null) {
Some(configKey.`type`)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ private static void buildUserClientQuotaConfigDef(ConfigDef configDef) {
ConfigDef.Importance.MEDIUM, CONTROLLER_MUTATION_RATE_DOC);
}

public static ConfigDef brokerQuotaConfigs() {
return new ConfigDef()
// Round minimum value down, to make it easier for users.
.define(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, ConfigDef.Type.LONG,
QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM, QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, ConfigDef.Type.LONG,
QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM, QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_DOC)
.define(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, ConfigDef.Type.LONG,
QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT, ConfigDef.Range.atLeast(0),
ConfigDef.Importance.MEDIUM, QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC);
}

public static ConfigDef userAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
buildUserClientQuotaConfigDef(configDef);
Expand Down