Skip to content

KAFKA-13417; Ensure dynamic reconfigurations set old config properly#11448

Merged
hachikuji merged 6 commits intoapache:trunkfrom
hachikuji:fix-dynamic-config-update-issue
Nov 9, 2021
Merged

KAFKA-13417; Ensure dynamic reconfigurations set old config properly#11448
hachikuji merged 6 commits intoapache:trunkfrom
hachikuji:fix-dynamic-config-update-issue

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji commented Oct 28, 2021

This patch fixes a bug in DynamicBrokerConfig which causes some configuration changes to be ignored. In particular, the bug is the result of the reference to the old configuration getting indirectly mutated prior to the call to BrokerReconfigurable.reconfigure. This causes the first dynamic configuration update to pass effectively the same configuration as both oldConfig and newConfig. In cases such as in DynamicThreadPool, the update is ignored because the old configuration value matches the new configuration value.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji
Copy link
Copy Markdown
Contributor Author

Note I am planning to add some additional test cases here. We should cover all of the DynamicThreadPool configurations and unclean leader election at a minimum.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Left a small comment.

Comment thread core/src/main/scala/kafka/server/KafkaConfig.scala Outdated

// We need a copy of the current config since `currentConfig` is initialized with `kafkaConfig`
// which means the call to `updateCurrentConfig` would end up mutating `oldConfig`.
val oldConfig = if (kafkaConfig eq currentConfig) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the PR. Do you know if there is an issue only with KRraft or is there an issue with ZK as well?

With ZK, the sequence is:

  1. Create KafkaConfig with static configs from server.properties
  2. Initialize KafkaConfig with initial configs from ZooKeeper using DynamicBrokerConfig.initialize(zkClient). This always creates a new KafkaConfig, so we don't need this check?
  3. Start DynamicConfigManager. All ZK updates after 2) are handled through change notifications.

With KRaft, there doesn't seem to be an initialize() for initializing the state from existing dynamic configs, so are all configs handled similar to 3)? In which case, we should perhaps move the initialization of currentConfig from DynamicBrokerConfig.initialize(zkClient) to somewhere common for KRaft and avoid this check here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Hmm, good point. It looks like the bug only affects KRaft since there is no call to initialize. The code still feels a little slippery though, so maybe there is room for some defensiveness. Let me take a look.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a patch which initializes currentConfig as null in order to make the call to initialize required. Let me know if that seems like a reasonable approach.

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the updates, LGTM.

Comment thread core/src/main/scala/kafka/server/KafkaServer.scala Outdated
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)

private[server] def initialize(zkClient: KafkaZkClient): Unit = {
private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the non-ZK world, does the initialization using existing dynamic configs happen later? Or does the original Kafka config contain the existing dynamic configs as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like dynamic configs get initialized later when we load the metadata log. I think this probably could be improved. Do you see any specific issues?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the ZK world, all ZK configs are static configs. We initialize dynamic configs very early on, once we have a ZK client that we create with ZK configs.

For the non-ZK world, I wasn't sure if the initialization required some dynamic configs. For example, we allow SSL keystore passwords to be stored in ZK prior to starting up brokers to avoid specifying any passwords in server.properties. Do we ensure that SocketServer only starts up after dynamic configs are initialized with KRaft?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Colin has an incoming KIP which will cover secret storage. It is a known gap at the moment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the updates, LGTM

@hachikuji hachikuji merged commit 78a5e92 into apache:trunk Nov 9, 2021
stanislavkozlovski added a commit to stanislavkozlovski/kafka that referenced this pull request Nov 11, 2021
…ntegration-11-nov

* ak/trunk: (15 commits)
  KAFKA-13429: ignore bin on new modules (apache#11415)
  KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides (apache#11272)
  KAFKA-12487: Add support for cooperative consumer protocol with sink connectors (apache#10563)
  MINOR: Log client disconnect events at INFO level (apache#11449)
  MINOR: Remove topic null check from `TopicIdPartition` and adjust constructor order (apache#11403)
  KAFKA-13417; Ensure dynamic reconfigurations set old config properly (apache#11448)
  MINOR: Adding a constant to denote UNKNOWN leader in LeaderAndEpoch (apache#11477)
  KAFKA-10543: Convert KTable joins to new PAPI (apache#11412)
  KAFKA-12226: Commit source task offsets without blocking on batch delivery (apache#11323)
  KAFKA-13396: Allow create topic without partition/replicaFactor (apache#11429)
  ...
hachikuji added a commit that referenced this pull request Nov 15, 2021
…11448)

This patch fixes a bug in `DynamicBrokerConfig` which causes some configuration changes to be ignored. In particular, the bug is the result of the reference to the old configuration getting indirectly mutated prior to the call to `BrokerReconfigurable.reconfigure`. This causes the first dynamic configuration update to pass effectively the same configuration as both `oldConfig` and `newConfig`. In cases such as in `DynamicThreadPool`, the update is ignored because the old configuration value matches the new configuration value.

This bug only affects KRaft. It is protected in the zk broker by the call to `DynamicBrokerConfig.initialize()`, which overwrites the stored reference to the original configuration. The patch fixes the problem by ensuring that `initialize()` is also invoked in KRaft when `BrokerServer` starts up.

Reviewers: David Jacot <djacot@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…pache#11448)

This patch fixes a bug in `DynamicBrokerConfig` which causes some configuration changes to be ignored. In particular, the bug is the result of the reference to the old configuration getting indirectly mutated prior to the call to `BrokerReconfigurable.reconfigure`. This causes the first dynamic configuration update to pass effectively the same configuration as both `oldConfig` and `newConfig`. In cases such as in `DynamicThreadPool`, the update is ignored because the old configuration value matches the new configuration value.

This bug only affects KRaft. It is protected in the zk broker by the call to `DynamicBrokerConfig.initialize()`, which overwrites the stored reference to the original configuration. The patch fixes the problem by ensuring that `initialize()` is also invoked in KRaft when `BrokerServer` starts up.

Reviewers: David Jacot <djacot@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants