Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;
import java.util.function.Function;

public class ChannelBuilders {
private static final Logger log = LoggerFactory.getLogger(ChannelBuilders.class);
Expand Down Expand Up @@ -104,7 +104,7 @@ public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
DelegationTokenCache tokenCache,
Time time,
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
Function<Short, ApiVersionsResponse> apiVersionSupplier) {
return create(securityProtocol, ConnectionMode.SERVER, JaasContext.Type.SERVER, config, listenerName,
isInterBrokerListener, null, true, credentialCache,
tokenCache, time, logContext, apiVersionSupplier);
Expand All @@ -122,7 +122,7 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
DelegationTokenCache tokenCache,
Time time,
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
Function<Short, ApiVersionsResponse> apiVersionSupplier) {
Map<String, Object> configs = channelBuilderConfigs(config, listenerName);

ChannelBuilder channelBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

import javax.security.auth.Subject;
Expand All @@ -88,7 +89,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
private final DelegationTokenCache tokenCache;
private final Map<String, LoginManager> loginManagers;
private final Map<String, Subject> subjects;
private final Supplier<ApiVersionsResponse> apiVersionSupplier;
private final Function<Short, ApiVersionsResponse> apiVersionSupplier;
private final String sslClientAuthOverride;
private final Map<String, AuthenticateCallbackHandler> saslCallbackHandlers;
private final Map<String, Long> connectionsMaxReauthMsByMechanism;
Expand All @@ -112,7 +113,7 @@ public SaslChannelBuilder(ConnectionMode connectionMode,
String sslClientAuthOverride,
Time time,
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
Function<Short, ApiVersionsResponse> apiVersionSupplier) {
this.connectionMode = connectionMode;
this.jaasContexts = jaasContexts;
this.loginManagers = new HashMap<>(jaasContexts.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

Expand All @@ -49,6 +50,71 @@ public class ApiVersionsResponse extends AbstractResponse {

private final ApiVersionsResponseData data;

public static class Builder {
private Errors error = Errors.NONE;
private int throttleTimeMs = 0;
private ApiVersionCollection apiVersions = null;
private Features<SupportedVersionRange> supportedFeatures = null;
private Map<String, Short> finalizedFeatures = null;
private long finalizedFeaturesEpoch = 0;
private boolean zkMigrationEnabled = false;
private boolean alterFeatureLevel0 = false;

public Builder setError(Errors error) {
this.error = error;
return this;
}

public Builder setThrottleTimeMs(int throttleTimeMs) {
this.throttleTimeMs = throttleTimeMs;
return this;
}

public Builder setApiVersions(ApiVersionCollection apiVersions) {
this.apiVersions = apiVersions;
return this;
}

public Builder setSupportedFeatures(Features<SupportedVersionRange> supportedFeatures) {
this.supportedFeatures = supportedFeatures;
return this;
}

public Builder setFinalizedFeatures(Map<String, Short> finalizedFeatures) {
this.finalizedFeatures = finalizedFeatures;
return this;
}

public Builder setFinalizedFeaturesEpoch(long finalizedFeaturesEpoch) {
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
return this;
}

public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) {
this.zkMigrationEnabled = zkMigrationEnabled;
return this;
}

public Builder setAlterFeatureLevel0(boolean alterFeatureLevel0) {
this.alterFeatureLevel0 = alterFeatureLevel0;
return this;
}

public ApiVersionsResponse build() {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
data.setErrorCode(error.code());
data.setApiKeys(Objects.requireNonNull(apiVersions));
data.setThrottleTimeMs(throttleTimeMs);
data.setSupportedFeatures(
maybeFilterSupportedFeatureKeys(Objects.requireNonNull(supportedFeatures), alterFeatureLevel0));
data.setFinalizedFeatures(
createFinalizedFeatureKeys(Objects.requireNonNull(finalizedFeatures)));
data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch);
data.setZkMigrationReady(zkMigrationEnabled);
return new ApiVersionsResponse(data);
}
}

public ApiVersionsResponse(ApiVersionsResponseData data) {
super(ApiKeys.API_VERSIONS);
this.data = data;
Expand Down Expand Up @@ -105,65 +171,32 @@ public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
}
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
public static ApiVersionCollection controllerApiVersions(
RecordVersion minRecordVersion,
Features<SupportedVersionRange> latestSupportedFeatures,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType,
boolean enableUnstableLastVersion,
boolean zkMigrationEnabled,
boolean clientTelemetryEnabled
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
apiKeys = intersectForwardableApis(
listenerType,
minRecordVersion,
controllerApiVersions.allSupportedApiVersions(),
enableUnstableLastVersion,
clientTelemetryEnabled
);
} else {
apiKeys = filterApis(
minRecordVersion,
listenerType,
enableUnstableLastVersion,
clientTelemetryEnabled
);
}

return createApiVersionsResponse(
throttleTimeMs,
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch,
zkMigrationEnabled
);
return intersectForwardableApis(
listenerType,
minRecordVersion,
controllerApiVersions.allSupportedApiVersions(),
enableUnstableLastVersion,
clientTelemetryEnabled);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch,
boolean zkMigrationEnabled
public static ApiVersionCollection brokerApiVersions(
RecordVersion minRecordVersion,
ListenerType listenerType,
boolean enableUnstableLastVersion,
boolean clientTelemetryEnabled
) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
throttleTimeMs,
Errors.NONE,
apiVersions,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch,
zkMigrationEnabled
)
);
return filterApis(
minRecordVersion,
listenerType,
enableUnstableLastVersion,
clientTelemetryEnabled);
}

public static ApiVersionCollection filterApis(
Expand Down Expand Up @@ -249,35 +282,24 @@ public static ApiVersionCollection intersectForwardableApis(
return apiKeys;
}

private static ApiVersionsResponseData createApiVersionsResponseData(
final int throttleTimeMs,
final Errors error,
final ApiVersionCollection apiKeys,
final Features<SupportedVersionRange> latestSupportedFeatures,
final Map<String, Short> finalizedFeatures,
final long finalizedFeaturesEpoch,
final boolean zkMigrationEnabled
private static SupportedFeatureKeyCollection maybeFilterSupportedFeatureKeys(
Features<SupportedVersionRange> latestSupportedFeatures,
Comment thread
cmccabe marked this conversation as resolved.
boolean alterV0
) {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
data.setThrottleTimeMs(throttleTimeMs);
data.setErrorCode(error.code());
data.setApiKeys(apiKeys);
data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures));
data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures));
data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch);
data.setZkMigrationReady(zkMigrationEnabled);

return data;
}

private static SupportedFeatureKeyCollection createSupportedFeatureKeys(
Features<SupportedVersionRange> latestSupportedFeatures) {
SupportedFeatureKeyCollection converted = new SupportedFeatureKeyCollection();
for (Map.Entry<String, SupportedVersionRange> feature : latestSupportedFeatures.features().entrySet()) {
final SupportedFeatureKey key = new SupportedFeatureKey();
final SupportedVersionRange versionRange = feature.getValue();
final SupportedFeatureKey key = new SupportedFeatureKey();
key.setName(feature.getKey());
key.setMinVersion(versionRange.min());
if (alterV0 && versionRange.min() == 0) {
// Some older clients will have deserialization problems if a feature's
// minimum supported level is 0. Therefore, when preparing ApiVersionResponse
// at versions less than 4, we must set the minimum version for these features
// to 1 rather than 0. See KAFKA-17011 for details.
key.setMinVersion((short) 1);
} else {
key.setMinVersion(versionRange.min());
}
key.setMaxVersion(versionRange.max());
converted.add(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Iterator;

public class BrokerRegistrationRequest extends AbstractRequest {

Expand All @@ -45,7 +46,21 @@ public short oldestAllowedVersion() {

@Override
public BrokerRegistrationRequest build(short version) {
return new BrokerRegistrationRequest(data, version);
if (version < 4) {
// Workaround for KAFKA-17011: for BrokerRegistrationRequest versions older than 4,
// translate minSupportedVersion = 0 to minSupportedVersion = 1.
BrokerRegistrationRequestData newData = data.duplicate();
for (Iterator<BrokerRegistrationRequestData.Feature> iter = newData.features().iterator();
Comment thread
chia7712 marked this conversation as resolved.
iter.hasNext(); ) {
BrokerRegistrationRequestData.Feature feature = iter.next();
if (feature.minSupportedVersion() == 0) {
feature.setMinSupportedVersion((short) 1);
Copy link
Copy Markdown
Member

@chia7712 chia7712 Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another issue caused by changing the version from 0 to 1. #15685 add the check which assumes the "default version" is 0

https://github.com/apache/kafka/blob/3.8/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java#L470

Hence, the features having min=1 can NOT pass the check. see following error message:

[2024-09-05 11:22:22,146] INFO [QuorumController id=2] registerBroker: event failed with UnsupportedVersionException in 127 microseconds. Exception message: Unable to register because the broker does not support version 0 of group.version. It wants a version between 1 and 1, inclusive. (org.apache.kafka.controller.QuorumController)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Thanks. Is there a way to reproduce this (e.g. an existing test)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to reproduce this (e.g. an existing test)?

run the following command with #17084

TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade.test_combined_mode_upgrade" \
/bin/bash tests/docker/run_tests.sh

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I thought that we could not set version 0 with the storage or update tools since it would treat it as disabling the feature. But @dajac pointed out to me that this is happening by setting the MV and that picks default features.

However, the min version 1 should only be the case for older request versions. I thought we fixed this to so we wouldn't have the requirement for version 4 and for 4.0 where group version is introduced.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I last talked to Colin about this, I believe he said we could not disable the feature if the min version is > 0.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I last talked to Colin about this, I believe he said we could not disable the feature if the min version is > 0.

@cmccabe : Could you confirm if this is case? How would people upgrade from an old release where a feature is not turned on?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suspect you have to upgrade the feature before the upgrade to the new release, but we can let Colin confirm.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with Colin offline. To summarize, if we do increase minVersion in the future, we need to bump up the default version for that feature. An old release may need to first upgrade to a bridge release where the new default version could be set. Given that, we don't need to change the logic in ClusterControlManager.

Copy link
Copy Markdown
Member

@chia7712 chia7712 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that, we don't need to change the logic in ClusterControlManager.

I will sync it to KAFKA-17429 and #17128

}
}
return new BrokerRegistrationRequest(newData, version);
} else {
return new BrokerRegistrationRequest(data, version);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.function.Function;

import javax.net.ssl.SSLSession;
import javax.security.auth.Subject;
Expand Down Expand Up @@ -129,7 +129,7 @@ private enum SaslState {
private final Time time;
private final ReauthInfo reauthInfo;
private final ChannelMetadataRegistry metadataRegistry;
private final Supplier<ApiVersionsResponse> apiVersionSupplier;
private final Function<Short, ApiVersionsResponse> apiVersionSupplier;

// Current SASL state
private SaslState saslState = SaslState.INITIAL_REQUEST;
Expand Down Expand Up @@ -159,7 +159,7 @@ public SaslServerAuthenticator(Map<String, ?> configs,
Map<String, Long> connectionsMaxReauthMsByMechanism,
ChannelMetadataRegistry metadataRegistry,
Time time,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
Function<Short, ApiVersionsResponse> apiVersionSupplier) {
this.callbackHandlers = callbackHandlers;
this.connectionId = connectionId;
this.subjects = subjects;
Expand Down Expand Up @@ -596,7 +596,7 @@ else if (!apiVersionsRequest.isValid())
else {
metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data().clientSoftwareName(),
apiVersionsRequest.data().clientSoftwareVersion()));
sendKafkaResponse(context, apiVersionSupplier.get());
sendKafkaResponse(context, apiVersionSupplier.apply(apiVersionsRequest.version()));
setSaslState(SaslState.HANDSHAKE_REQUEST);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
// Versions 0 through 2 of ApiVersionsRequest are the same.
//
// Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion.
"validVersions": "0-3",
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
{ "name": "ClientSoftwareName", "type": "string", "versions": "3+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
//
// Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported
// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
"validVersions": "0-3",
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0.
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
Expand All @@ -45,7 +47,7 @@
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "SupportedFeatures", "type": "[]SupportedFeatureKey", "ignorable": true,
"versions": "3+", "tag": 0, "taggedVersions": "3+",
"about": "Features supported by the broker.",
"about": "Features supported by the broker. Note: in v0-v3, features with MinSupportedVersion = 0 show up with MinSupportedVersion = 1.",
"fields": [
{ "name": "Name", "type": "string", "versions": "3+", "mapKey": true,
"about": "The name of the feature." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
// Version 2 adds LogDirs for KIP-858

// Version 3 adds the PreviousBrokerEpoch for the KIP-966

// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.

{
"apiKey":62,
"type": "request",
"listeners": ["controller"],
"name": "BrokerRegistrationRequest",
"validVersions": "0-3",
"validVersions": "0-4",
Comment thread
cmccabe marked this conversation as resolved.
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
Expand All @@ -45,7 +48,7 @@
]
},
{ "name": "Features", "type": "[]Feature",
"about": "The features on this broker", "versions": "0+", "fields": [
"about": "The features on this broker. Note: in v0-v3, features with MinSupportedVersion = 0 show up with MinSupportedVersion = 1.", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"about": "The feature name." },
{ "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"apiKey": 62,
"type": "response",
"name": "BrokerRegistrationResponse",
"validVersions": "0-3",
"validVersions": "0-4",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand Down
Loading