Skip to content

KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations#17258

Merged
showuon merged 5 commits intotrunkfrom
KAFKA-17584
Sep 26, 2024
Merged

KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations#17258
showuon merged 5 commits intotrunkfrom
KAFKA-17584

Conversation

@cmccabe
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe commented Sep 23, 2024

Several Kafka log configurations in have synonyms. For example, log retention can be configured either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also a faculty in Kafka to dynamically change broker configurations without restarting the broker. These dynamically set configurations are stored in the metadata log and override what is in the broker properties file.

Unfortunately, these two features interacted poorly; there was a bug where the dynamic log configuration update code ignored synonyms. For example, if you set log.retention.minutes and then reconfigured something unrelated that triggered the LogConfig update path, the retention value that you had configured was overwritten.

The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker configuration as a bag of key/value entities rather than extracting the correct retention time (or other setting with overrides) from the KafkaConfig object.

@github-actions github-actions Bot added the core Kafka Broker label Sep 23, 2024
…tions

Several Kafka log configurations in have synonyms. For example, log retention can be configured
either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also
a faculty in Kafka to dynamically change broker configurations without restarting the broker. These
dynamically set configurations are stored in the metadata log and override what is in the broker
properties file.

Unfortunately, these two features interacted poorly; there was a bug where the dynamic log
configuration update code ignored synonyms. For example, if you set log.retention.minutes and then
reconfigured something unrelated that triggered the LogConfig update path, the retention value that
you had configured was overwritten.

The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker
configuration as a bag of key/value entities rather than extracting the correct retention time (or
other setting with overrides) from the KafkaConfig object.

Separately from the above bug, the code did not honor the value of dynamically configured synonyms:
setting log.retention.minutes had no effect; only log.retention.ms was honored.
@showuon showuon self-assigned this Sep 24, 2024
Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Minor comment left.
There's a DynamicBrokerReconfigurationTest.testConfigDescribeUsingAdminClient test failed. I had a look, it looks like we were validating the wrong thing before. The log.retention.hours should be not a read-only config. It can be dynamically changed, as well as log.roll.hours.

results.asScala
}

val KafkaConfigToLogConfigName: Map[String, String] = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: After this PR, this variable is only used in DynamicBrokerReconfigurationTest. We can move it there.

@fvaleri
Copy link
Copy Markdown
Contributor

fvaleri commented Sep 24, 2024

The log.retention.hours should be not a read-only config. It can be dynamically changed, as well as log.roll.hours.

I think we should also update the documentation, as they are both reported as read-only.

ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
val KafkaConfigToLogConfigName: Map[String, String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
val ReconfigurableConfigs: Set[String] = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (immutable style):

  val ReconfigurableConfigs: Set[String] = {
    ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS
      .values()
      .asScala
      .flatMap(v => v.asScala.map(configSynonym => configSynonym.name()))
      .filterNot(_ == ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)
      .toSet
  }

@showuon
Copy link
Copy Markdown
Member

showuon commented Sep 24, 2024

The log.retention.hours should be not a read-only config. It can be dynamically changed, as well as log.roll.hours.

I think we should also update the documentation, as they are both reported as read-only.

Good point! But the doc is actually generated, and I just confirmed, after this PR, the doc will be correctly updated:

log.retention.hours

The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property
Type:	int
Default:	168
Valid Values:	
Importance:	high
Update Mode:	cluster-wide
log.retention.minutes

The number of minutes to keep a log file before deleting it (in minutes), secondary to log.retention.ms property. If not set, the value in log.retention.hours is used
Type:	int
Default:	null
Valid Values:	
Importance:	high
Update Mode:	cluster-wide
log.retention.ms

The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied.
Type:	long
Default:	null
Valid Values:	
Importance:	high
Update Mode:	cluster-wide
log.roll.hours

The maximum time before a new log segment is rolled out (in hours), secondary to log.roll.ms property
Type:	int
Default:	168
Valid Values:	[1,...]
Importance:	high
Update Mode:	cluster-wide
log.roll.jitter.hours

The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to log.roll.jitter.ms property
Type:	int
Default:	0
Valid Values:	[0,...]
Importance:	high
Update Mode:	cluster-wide

Comment thread core/src/main/scala/kafka/server/DynamicBrokerConfig.scala Outdated
Copy link
Copy Markdown
Contributor

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log.retention.hours should be not a read-only config. It can be dynamically changed, as well as log.roll.hours.

I think we should also update the documentation, as they are both reported as read-only.

Good point! But the doc is actually generated, and I just confirmed, after this PR, the doc will be correctly updated

My only call out is that currently (in trunk) log.retention.hours/minutes are documented as read-only and behave as read-only. After this change they become cluster-wide in both documentation and behaviour. I am happy with this change as long as we publicly call it out in some form.

@fvaleri
Copy link
Copy Markdown
Contributor

fvaleri commented Sep 24, 2024

we publicly call it out in some form

Agreed. This is what happens right now, so it would be a behavior change. I think a simple release note would be fine.

$ bin/kafka-configs.sh --bootstrap-server :9092 --entity-type brokers --entity-name 2 --alter --add-config log.retention.minutes=1
Error while executing config command with args '--bootstrap-server :9092 --entity-type brokers --entity-name 2 --alter --add-config log.retention.minutes=1'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: Cannot update these configs dynamically: Set(log.retention.minutes)
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
	at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:390)
	at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351)
	at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100)
	at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidRequestException: Cannot update these configs dynamically: Set(log.retention.minutes)

@showuon
Copy link
Copy Markdown
Member

showuon commented Sep 24, 2024

I've sent a mail dev/user mailing list to discuss it here. FYI.

Copy link
Copy Markdown
Contributor

@fvaleri fvaleri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @showuon.

@mumrah
Copy link
Copy Markdown
Member

mumrah commented Sep 24, 2024

Looks like there is a failing test:

DynamicBrokerReconfigurationTest > "testConfigDescribeUsingAdminClient(String)

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Sep 24, 2024

For now, I'm going to change it back to the current behavior where we ignore the value of dynamically configured synonyms such as log.retention.minutes, etc. We can make that change in a separate PR if people feel strongly about it. (The fact that it's been like this since 3.0 seems like a good reason to leave it for now....)

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! It's fine we keep the old behavior since it won't block any operation.

@kamalcph
Copy link
Copy Markdown
Contributor

This issue was first reported in KAFKA-15266 and a PR #14119 is opened for it.

Copy link
Copy Markdown

@amangandhi94 amangandhi94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me and addresses the bug I had raised - https://issues.apache.org/jira/browse/KAFKA-15266

Just wondering how can I get more attention to bugs or PR i have raised next time onwards?

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - Do we know why this logic was even added? I cant seem to understand if there was any benefit to doing this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See discussion here #17258 (comment)

@kamalcph kamalcph removed their request for review September 25, 2024 07:00
@fvaleri
Copy link
Copy Markdown
Contributor

fvaleri commented Sep 25, 2024

Just wondering how can I get more attention to bugs or PR i have raised next time onwards?

In this case raising the bug as a blocker, due to the potential data loss. In general, sending an email to the dev mailing list would be enough to catch some more attention in my experience. Hope it helps.

Copy link
Copy Markdown
Contributor

@fvaleri fvaleri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals)
newConfig.valuesFromThisConfig.forEach { (k, v) =>
if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been a long time, so I may not remember correctly. But I think we used to only update configs which are ReconfigurableConfigs even if you updated ZooKeeper directly with new configs that included others. Non-reconfigurable configs would be picked up only on the next broker restart. Do we think it is safe to include all configs now either because we don't allow other configs to be updated in KRaft or because all log configs are reconfigurable now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment, @rajinisivaram . I think you're right that we should be excluding changes to configurations that don't appear in DynamicLogConfig.ReconfigurableConfigs. I have added some code to do this now. I think from a practical point of view, this only affects message.format.version, which we explicitly excluded from being dynamically reconfigurable.

Do we think it is safe to include all configs now either because we don't allow other configs to be updated in KRaft or because all log configs are reconfigurable now?

KRaft has the same behavior as ZK, in that it lets you set any broker configuration you want, whether it's valid or not. There were too many people depending on this for us to change it in 3.x. Maybe in the future if someone creates a KIP...

Of course the command-line tool does its own validation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call @rajinisivaram !

…urable

- Add testLogRetentionTimeMinutesIsNotDynamicallyReconfigurable

- clean up some cases where we were using zkconnect but did not need to
Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Thanks for the PR, LGTM

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the PR. Added a couple of comments.

val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
originalLogConfig.originals().forEach((k, v) => {
if (!DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
newBrokerDefaults.put(k, v)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to translate the name from KafkaConfig to LogConfig as the original code does?
  2. We started newBrokerDefaults with newConfig.extractLogConfigMap that includes non-reconfigurable configs. Where is the logic to remove them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me fix this a bit. It should be sufficient to just remove the non-reconfigurable configs (really, singular config)

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. A few more comments.

val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
val originalLogConfigMap = originalLogConfig.originals()
DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
Option(originalLogConfigMap.get(k)) match {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, originalLogConfigMap is the current config. We should check from newConfig, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-reconfigurable configs are copied over from the current (not new) configuration.

DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
Option(originalLogConfigMap.get(k)) match {
case None => newBrokerDefaults.remove(k)
case Some(v) => newBrokerDefaults.put(k, v)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that I follow here. Why are we putting a non-reconfigurable config to newBrokerDefaults?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want the non-reconfigurable configuration to have the same value (or lack of value) that it had previously. It should not change.

case Some(v) => newBrokerDefaults.put(k, v)
}
}
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newBrokerDefaults has the config name with log prefix, right? Should we translate the name from KafkaConfig to LogConfig as the original code does?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newBrokerDefaults has the config name with log prefix, right

no. it comes from KafkaConfig.extractLogConfigMap, which creates a map of log (aka topic) configurations, not broker configurations.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the explanation. LGTM

@showuon
Copy link
Copy Markdown
Member

showuon commented Sep 26, 2024

All tests passed.

@showuon showuon merged commit 3d23029 into trunk Sep 26, 2024
showuon pushed a commit that referenced this pull request Sep 26, 2024
…tions (#17258)

Several Kafka log configurations in have synonyms. For example, log retention can be configured
either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also
a faculty in Kafka to dynamically change broker configurations without restarting the broker. These
dynamically set configurations are stored in the metadata log and override what is in the broker
properties file.

Unfortunately, these two features interacted poorly; there was a bug where the dynamic log
configuration update code ignored synonyms. For example, if you set log.retention.minutes and then
reconfigured something unrelated that triggered the LogConfig update path, the retention value that
you had configured was overwritten.

The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker
configuration as a bag of key/value entities rather than extracting the correct retention time (or
other setting with overrides) from the KafkaConfig object.

Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>, Federico Valeri <fedevaleri@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, amangandhi94 <>
showuon pushed a commit to showuon/kafka that referenced this pull request Sep 26, 2024
…tions (apache#17258)

Several Kafka log configurations in have synonyms. For example, log retention can be configured
either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also
a faculty in Kafka to dynamically change broker configurations without restarting the broker. These
dynamically set configurations are stored in the metadata log and override what is in the broker
properties file.

Unfortunately, these two features interacted poorly; there was a bug where the dynamic log
configuration update code ignored synonyms. For example, if you set log.retention.minutes and then
reconfigured something unrelated that triggered the LogConfig update path, the retention value that
you had configured was overwritten.

The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker
configuration as a bag of key/value entities rather than extracting the correct retention time (or
other setting with overrides) from the KafkaConfig object.

Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>, Federico Valeri <fedevaleri@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, amangandhi94 <>
showuon added a commit that referenced this pull request Sep 26, 2024
…tions (#17258) (#17278)

Several Kafka log configurations in have synonyms. For example, log retention can be configured
either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also
a faculty in Kafka to dynamically change broker configurations without restarting the broker. These
dynamically set configurations are stored in the metadata log and override what is in the broker
properties file.

Unfortunately, these two features interacted poorly; there was a bug where the dynamic log
configuration update code ignored synonyms. For example, if you set log.retention.minutes and then
reconfigured something unrelated that triggered the LogConfig update path, the retention value that
you had configured was overwritten.

The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker
configuration as a bag of key/value entities rather than extracting the correct retention time (or
other setting with overrides) from the KafkaConfig object.

Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>, Federico Valeri <fedevaleri@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, amangandhi94 <>

Co-authored-by: Colin Patrick McCabe <cmccabe@apache.org>

Reviewers: Josep Prat <josep.prat@aiven.io>
mjsax pushed a commit that referenced this pull request Nov 25, 2024
…tions

This is a cherry-pick of #17258 to 3.7.2

This commit differs from the original by using the old (read 3.7) references to the configurations and not changing as many unit tests

Reviewers: Divij Vaidya <diviv@amazon.com>, Colin Patrick McCabe <cmccabe@apache.org>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…tions (apache#17258)

Several Kafka log configurations in have synonyms. For example, log retention can be configured
either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also
a faculty in Kafka to dynamically change broker configurations without restarting the broker. These
dynamically set configurations are stored in the metadata log and override what is in the broker
properties file.

Unfortunately, these two features interacted poorly; there was a bug where the dynamic log
configuration update code ignored synonyms. For example, if you set log.retention.minutes and then
reconfigured something unrelated that triggered the LogConfig update path, the retention value that
you had configured was overwritten.

The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker
configuration as a bag of key/value entities rather than extracting the correct retention time (or
other setting with overrides) from the KafkaConfig object.

Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>, Federico Valeri <fedevaleri@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, amangandhi94 <>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants