KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode#11784
Conversation
There was a problem hiding this comment.
This is the source of the bug, we always assume currentNodeApiVersions=ApiKeys.zkBrokerApis
There was a problem hiding this comment.
I want to extend ClusterTestKit to add a new clusterType=Type.CO_KRAFT, but this is not a minor work.
There was a problem hiding this comment.
Looks like there are a few checkstyle issues in this file
There was a problem hiding this comment.
Are these changes just cleaning up? Did we not want to add an extra parameter to DefaultAlterIsrManager?
There was a problem hiding this comment.
Yes, this is just a cleanup.
Currently, we have several BrokerToControllerChannel for AlterIsrManager, LifecycleManager, AutoTopicCreateManager and ForwardingManager. My cleanup here has 2 benifits:
- Create and manage them all in
BrokerServerorKafkaServer - Avoid creating different
ControllerNodeProviderfor each of them
There was a problem hiding this comment.
Unless I'm misreading the code, it seems like this was not None before. Are there implications for adding the None case?
There was a problem hiding this comment.
The NodeApiVersions is used in BrokerToControllerChannelManager to get controller ApiVersion only if the current node is both the broker and the controller(which only happen in KRaft co-resident mode or zk mode), otherwise it will connect the controller and send ApiversionRequest to get it. in other cases, it is not used so I just set it to None, and it will make no difference if I change it to Some(controllerApiVersion)
2b9ef1f to
69bbcbb
Compare
hachikuji
left a comment
There was a problem hiding this comment.
@dengziming Thanks, this looks like a good fix. Would you mind rebasing?
aa7df09 to
e972040
Compare
e972040 to
ac07a78
Compare
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for rebasing. Left a couple comments.
| def apply( | ||
| config: KafkaConfig, | ||
| metadataCache: MetadataCache, | ||
| alterPartitionChannelManager: BrokerToControllerChannelManager, |
There was a problem hiding this comment.
nit: I think alterPartition is redundant here. Could we call it channelManager?
| requestThread.activeControllerAddress().flatMap { activeController => | ||
| if (activeController.id == config.brokerId) | ||
| Some(currentNodeApiVersions) | ||
| currentNodeControllerApiVersions |
There was a problem hiding this comment.
I wonder if this optimization is worth preserving. Any harm using the same negotiation even when the controller is co-resident?
There was a problem hiding this comment.
I didn't find any harm in using the same negotiation so I removed this optimization here. Yet we should notice one thing here, the NetworkClient will only send ApiversionRequest to a broker when connected to it and will not refresh it before reconnecting, this assumption is no longer absolutely right since the broker will intersect controller apiVersion and controller apiVersion would change.
There was a problem hiding this comment.
If I understand correctly, you are saying that the range of supported api versions may change while a client is still connected to the broker. In general it's only a problem if the range of supported versions is reduced (e.g. in a downgrade). The way the code is supposed to handle this is by catching the unsupported version in the Envelope response and disconnecting the client in order to ensure that it reconnects and gets the latest range of api version support. Check KafkaApis.handleInvalidVersionsDuringForwarding and see if that logic makes sense. I do not know if we have testing for this scenario.
There was a problem hiding this comment.
Yes, there may be a similar problem with finalized features since they will also change, I will check KafkaApis.handleInvalidVersionsDuringForwarding to inspect it.
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, LGTM overall. Just one minor comment.
| // Start alter partition manager based on the IBP version | ||
| alterIsrManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) { | ||
| alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) { | ||
| val alterPartitionChannelManager = BrokerToControllerChannelManager( |
There was a problem hiding this comment.
nit: I think I favor creating the BrokerToControllerChannelManager in AlterPartitionManager.apply. The main thing is that it makes the ownership clearer. Currently DefaultAlterPartitionManager is responsible for starting and stopping the channel manager. We can revise BrokerServer to use the same apply method (I am not sure why we didn't do that).
There was a problem hiding this comment.
I guess we create BrokerToControllerChannelManager in BrokerServer because we want to consolidate these channel to use only one, #10135 (comment)
but it's it's still early to consolidate channel now so I moved it to AlterPartitionManager.apply as you suggested.
There was a problem hiding this comment.
Consolidation is a little difficult because of head-of-line blocking (sigh). It might be reasonable to combine the alter partition and broker lifecycle channel managers though. Anyway, this is definitely a separate patch.
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the patch!
|
@dengziming Hope you don't mind, but I pushed a simple fix for an NPE. |
Thanks, this fix is reasonable.👍 |
More detailed description of your change
This is based on #12157
Normally, a broker connects to the active controller to learn its ApiVersions, and in co-resident KRaft mode, a broker gets the ApiVersions directly from binary code if the broker is also the activate controller.
But we only concerns
ApiKeys.zkBrokerApis()when we callNodeApiVersions.create(), we should useApiKeys.controllerApiVersionswhen in KRaft mode.Summary of testing strategy (including rationale)
When I described quorum in Kraft mode I got
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_QUORUM.After this change, the DESCRIBE_QUORUM request was property handled and got a correct response:
And I also add an integration test for this.
Committer Checklist (excluded from commit message)