Skip to content

KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers#14306

Merged
cmccabe merged 14 commits intoapache:trunkfrom
cmccabe:cmccabe_kip_919
Sep 7, 2023
Merged

KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers#14306
cmccabe merged 14 commits intoapache:trunkfrom
cmccabe:cmccabe_kip_919

Conversation

@cmccabe
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe commented Aug 29, 2023

Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to talk directly with the controller quorum. This is all gated behind a new MetadataVersion, IBP_3_7_IV0.

In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out into AuthHelper.computeDescribeClusterResponse.

The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest

On the controller side, the controllers now try to register themselves with the current active controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other associated classes have been upgraded to propagate the new metadata. In the metadata shell, the cluster directory now contains both broker and controller subdirectories.

QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable data. Specifically, it contains the current node ID, the locally supported features, and the list of quorum node IDs in the cluster.

…llers

Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add
Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported
by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to
talk directly with the controller quorum. This is all gated behind a new MetadataVersion,
IBP_3_7_IV0.

In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out
into AuthHelper.computeDescribeClusterResponse.

The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and
UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest

On the controller side, the controllers now try to register themselves with the current active
controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a
RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other
associated classes have been upgraded to propagate the new metadata. In the metadata shell, the
cluster directory now contains both broker and controller subdirectories.

QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's
NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable
data. Specifically, it contains the current node ID, the locally supported features, and the list
of quorum node IDs in the cluster.
@cmccabe cmccabe added the kraft label Aug 29, 2023
@showuon showuon self-assigned this Aug 30, 2023
Comment thread core/src/main/scala/kafka/server/AuthHelper.scala
Comment on lines +15 to +22
log4j.rootLogger=OFF, stdout
log4j.rootLogger=DEBUG, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
log4j.logger.kafka=DEBUG
log4j.logger.org.apache.kafka=DEBUG
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we revert them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this should be reverted. I'll fix it now.

}

private def rpcStats(manager: ControllerRegistrationManager): (Long, Long, Long) = {
val failedAttempts = new CompletableFuture[(Long, Long, Long)]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the variable name failedAttempted mean here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number of times we've tried to send an RPC and failed for some reason (or gotten back an error code)

Copy link
Copy Markdown
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cmccabe! Left some comments inline.


One question I had was about de-registration (or lack thereof). How do we deal with one controller being offline? Do we just use its most recent registration?

If two controllers are down, then no metadata gets updated (or published?) since we don't have a leader. But if we have two out of three, what happens if we make a decision based on the old registration and then that controller comes back up in a different configuration?

E.g.,

  1. Three controllers A, B, C register at metadata.version X
  2. Controller A goes down
  3. Operator bumps up metadata.version to X+1, this works because the active controller looks at A's previous registration
  4. Controller A comes back up with older software that doesn't support X (or maybe B and C upgrade to newer software)
  5. Controller A replays FeatureLevelRecord X+1 and crashes

I vaguely recall discussing something like this during the KIP, but I just wanted to confirm the expected behavior here.

Comment on lines +1028 to +1029
// Unlike on the broker, DESCRIBE_CLUSTER on the controller requires a high level of
// permissions (ALTER on CLUSTER).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

direct-to-controller operation is intended only for administrators. if you're not an admin, you should talk to the brokers. (In a well-run network, this should also be enforced by putting non-administrators on a separate subnet.)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this excellent explanation, @cmccabe! I've created JIRA issue https://issues.apache.org/jira/browse/KAFKA-18434 to incorporate this explanation into the error message. This will help users understand the permission distinctions when using controllers as bootstrap servers.

import scala.jdk.CollectionConverters._

/**
* The broker lifecycle manager owns the broker state.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other javadocs in this class need updating (looks like copy-pasta from BrokerLifecycleManager 🙃)

Copy link
Copy Markdown
Contributor Author

@cmccabe cmccabe Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

;)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

manifest: LoaderManifest
): Unit = {
if (delta.featuresDelta() != null ||
(delta.clusterDelta() != null && delta.clusterDelta().changedControllers().containsKey(nodeId))) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the controller only care about its own registration in this class? Is that why we're filtering by nodeId here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it cares only about its own registration here

/**
* The number of RPCs that we are waiting for. Only read or written from the event queue thread.
*/
var pendingRpcs = 0L
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we only allow one pending RPC at a time. Should we use a boolean here instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

registered = false
} else {
info(s"Our registration has been persisted to the metadata log.")
registered = true
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of registered, how about something like registeredInLog or something more specific. Since we're sending "Registration" RPCs, someone might interpret "registered" to mean that we've sent a registration request or something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. i'll change it to registeredInLog


// Set up the controller registrations publisher.
metadataPublishers.add(controllerRegistrationsPublisher)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the registration manager need to be added to the metadataPublishers?

.setHost(broker.host)
.setPort(broker.port)
.setRack(broker.rack))
val response = authHelper.computeDescribeClusterResponse(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle the case of someone requesting Controller endpoints from the broker (and vice versa)? Or is that handled somewhere else?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That case is handled in AuthHelper.computeDescribeClusterResponse

public interface ClusterSupportDescriber {
Iterator<Entry<Integer, Map<String, VersionRange>>> brokerSupported();
Iterator<Entry<Integer, Map<String, VersionRange>>> controllerSupported();
} No newline at end of file
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing a blank line at bottom of file

import org.apache.kafka.metadata.VersionRange;


public interface ClusterSupportDescriber {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ClusterFeatureDescriber, ClusterFeatureSupport, or something with "feature" in the name

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusterFeatureSupportDescriber is fine, I guess. A bit long, but descriptive.

int numBrokersChecked = 0;
int numControllersChecked = 0;
Optional<String> reason = quorumFeatures.reasonNotLocallySupported(featureName, newVersion);
if (reason.isPresent()) return reason;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reformat this? (I don't think we normally do one-liner if statements?)

Copy link
Copy Markdown
Contributor Author

@cmccabe cmccabe Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ehhh... we normally do allow them.

% git grep -w return | grep -w if | wc -l
    1470

I think the rule is that you need braces for any if statement that spans multiple lines. Also if it gets (horizontally) long, you should cut that out.

We should probably teach checkstyle about this at some point, or at least write it down

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Aug 31, 2023

One question I had was about de-registration (or lack thereof). How do we deal with one controller being offline? Do we just use its most recent registration?

We just use the most recent registration. Like you said, it's possible that this is out of date. But there's no getting around that in a distributed system.

More generally, we can't prevent people from bringing up nodes with old software. If you want to break your cluster by doing that, you can do it, and there's nothing we can do about it. We don't control the version of software deployed on your cluster.

The safety interlock is designed for the case where the admin is being "reasonable"

Comment on lines +397 to +400
* @param context The controller request context.
* @param request The registration request.
*
* @return A future yielding the broker registration reply.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: alignment

Comment on lines +25 to +31
/*
@Test
public void testInitialControllers() {
ControllerRegistrationsPublisher publisher = new ControllerRegistrationsPublisher();
assertEquals(Collections.emptyMap(), publisher.controllers());
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All commented out?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Copy Markdown
Member

@dengziming dengziming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change, I left 2 minor comments.

private final ControllerRegistrationRequestData data;

public Builder(ControllerRegistrationRequestData data) {
super(ApiKeys.BROKER_HEARTBEAT);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be CONTROLLER_REGISTRATION

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, @dengziming !

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, nice catch. Fixed.

// limitations under the License.

{
"apiKey": 70,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have 69, why use 70?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: now using 70 again since someone claimed 69 :)

}

public short registerControllerRecordVersion() {
if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not �using IBP_3_7_IV0, does this mean a old-version controller can register a controller with an new-version controller?

Copy link
Copy Markdown
Contributor Author

@cmccabe cmccabe Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this was left over from when the feature was in IBP_3_6_IV2 previously. Fixed now to be IBP_3_7_IV0

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Sep 7, 2023

Fixed conflicts

@cmccabe cmccabe merged commit 41b695b into apache:trunk Sep 7, 2023
@cmccabe cmccabe deleted the cmccabe_kip_919 branch September 7, 2023 22:22
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
…llers (apache#14306)

Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add
Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported
by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to
talk directly with the controller quorum. This is all gated behind a new MetadataVersion,
IBP_3_7_IV0.

In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out
into AuthHelper.computeDescribeClusterResponse.

The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and
UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest

On the controller side, the controllers now try to register themselves with the current active
controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a
RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other
associated classes have been upgraded to propagate the new metadata. In the metadata shell, the
cluster directory now contains both broker and controller subdirectories.

QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's
NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable
data. Specifically, it contains the current node ID, the locally supported features, and the list
of quorum node IDs in the cluster.

Reviewers: David Arthur <mumrah@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
case Some(value) =>
value match {
case ZkCachedControllerId (id) => id
case KRaftCachedControllerId (_) => metadataCache.getRandomAliveBrokerId.getOrElse(- 1)
Copy link
Copy Markdown
Contributor

@bachmanity1 bachmanity1 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cmccabe,
Thanks for the PR! Sorry for revisiting this, but since this PR allows clients to communicate directly with controllers, wouldn't it make more sense to return the active controller ID here instead of a random broker ID?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants