diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 292ec4c41ad8f..0bf64c77c6717 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -42,7 +42,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.function.Supplier; +import java.util.function.Function; public class ChannelBuilders { private static final Logger log = LoggerFactory.getLogger(ChannelBuilders.class); @@ -104,7 +104,7 @@ public static ChannelBuilder serverChannelBuilder(ListenerName listenerName, DelegationTokenCache tokenCache, Time time, LogContext logContext, - Supplier apiVersionSupplier) { + Function apiVersionSupplier) { return create(securityProtocol, ConnectionMode.SERVER, JaasContext.Type.SERVER, config, listenerName, isInterBrokerListener, null, true, credentialCache, tokenCache, time, logContext, apiVersionSupplier); @@ -122,7 +122,7 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, DelegationTokenCache tokenCache, Time time, LogContext logContext, - Supplier apiVersionSupplier) { + Function apiVersionSupplier) { Map configs = channelBuilderConfigs(config, listenerName); ChannelBuilder channelBuilder; diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 295d7dbebabb1..c8c621698c06a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -69,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.function.Supplier; import javax.security.auth.Subject; @@ -88,7 +89,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl private final DelegationTokenCache tokenCache; private final Map loginManagers; private final Map subjects; - private final Supplier apiVersionSupplier; + private final Function apiVersionSupplier; private final String sslClientAuthOverride; private final Map saslCallbackHandlers; private final Map connectionsMaxReauthMsByMechanism; @@ -112,7 +113,7 @@ public SaslChannelBuilder(ConnectionMode connectionMode, String sslClientAuthOverride, Time time, LogContext logContext, - Supplier apiVersionSupplier) { + Function apiVersionSupplier) { this.connectionMode = connectionMode; this.jaasContexts = jaasContexts; this.loginManagers = new HashMap<>(jaasContexts.size()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 04ee2014d1d44..a7e4a27c44321 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -35,6 +35,7 @@ import java.nio.ByteBuffer; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -49,6 +50,71 @@ public class ApiVersionsResponse extends AbstractResponse { private final ApiVersionsResponseData data; + public static class Builder { + private Errors error = Errors.NONE; + private int throttleTimeMs = 0; + private ApiVersionCollection apiVersions = null; + private Features supportedFeatures = null; + private Map finalizedFeatures = null; + private long finalizedFeaturesEpoch = 0; + private boolean zkMigrationEnabled = false; + private boolean alterFeatureLevel0 = false; + + public Builder setError(Errors error) { + this.error = error; + return this; + } + + public Builder setThrottleTimeMs(int throttleTimeMs) { + this.throttleTimeMs = throttleTimeMs; + return this; + } + + public Builder setApiVersions(ApiVersionCollection apiVersions) { + this.apiVersions = apiVersions; + return this; + } + + public Builder setSupportedFeatures(Features supportedFeatures) { + this.supportedFeatures = supportedFeatures; + return this; + } + + public Builder setFinalizedFeatures(Map finalizedFeatures) { + this.finalizedFeatures = finalizedFeatures; + return this; + } + + public Builder setFinalizedFeaturesEpoch(long finalizedFeaturesEpoch) { + this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; + return this; + } + + public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) { + this.zkMigrationEnabled = zkMigrationEnabled; + return this; + } + + public Builder setAlterFeatureLevel0(boolean alterFeatureLevel0) { + this.alterFeatureLevel0 = alterFeatureLevel0; + return this; + } + + public ApiVersionsResponse build() { + final ApiVersionsResponseData data = new ApiVersionsResponseData(); + data.setErrorCode(error.code()); + data.setApiKeys(Objects.requireNonNull(apiVersions)); + data.setThrottleTimeMs(throttleTimeMs); + data.setSupportedFeatures( + maybeFilterSupportedFeatureKeys(Objects.requireNonNull(supportedFeatures), alterFeatureLevel0)); + data.setFinalizedFeatures( + createFinalizedFeatureKeys(Objects.requireNonNull(finalizedFeatures))); + data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch); + data.setZkMigrationReady(zkMigrationEnabled); + return new ApiVersionsResponse(data); + } + } + public ApiVersionsResponse(ApiVersionsResponseData data) { super(ApiKeys.API_VERSIONS); this.data = data; @@ -105,65 +171,32 @@ public static ApiVersionsResponse parse(ByteBuffer buffer, short version) { } } - public static ApiVersionsResponse createApiVersionsResponse( - int throttleTimeMs, + public static ApiVersionCollection controllerApiVersions( RecordVersion minRecordVersion, - Features latestSupportedFeatures, - Map finalizedFeatures, - long finalizedFeaturesEpoch, NodeApiVersions controllerApiVersions, ListenerType listenerType, boolean enableUnstableLastVersion, - boolean zkMigrationEnabled, boolean clientTelemetryEnabled ) { - ApiVersionCollection apiKeys; - if (controllerApiVersions != null) { - apiKeys = intersectForwardableApis( - listenerType, - minRecordVersion, - controllerApiVersions.allSupportedApiVersions(), - enableUnstableLastVersion, - clientTelemetryEnabled - ); - } else { - apiKeys = filterApis( - minRecordVersion, - listenerType, - enableUnstableLastVersion, - clientTelemetryEnabled - ); - } - - return createApiVersionsResponse( - throttleTimeMs, - apiKeys, - latestSupportedFeatures, - finalizedFeatures, - finalizedFeaturesEpoch, - zkMigrationEnabled - ); + return intersectForwardableApis( + listenerType, + minRecordVersion, + controllerApiVersions.allSupportedApiVersions(), + enableUnstableLastVersion, + clientTelemetryEnabled); } - public static ApiVersionsResponse createApiVersionsResponse( - int throttleTimeMs, - ApiVersionCollection apiVersions, - Features latestSupportedFeatures, - Map finalizedFeatures, - long finalizedFeaturesEpoch, - boolean zkMigrationEnabled + public static ApiVersionCollection brokerApiVersions( + RecordVersion minRecordVersion, + ListenerType listenerType, + boolean enableUnstableLastVersion, + boolean clientTelemetryEnabled ) { - return new ApiVersionsResponse( - createApiVersionsResponseData( - throttleTimeMs, - Errors.NONE, - apiVersions, - latestSupportedFeatures, - finalizedFeatures, - finalizedFeaturesEpoch, - zkMigrationEnabled - ) - ); + return filterApis( + minRecordVersion, + listenerType, + enableUnstableLastVersion, + clientTelemetryEnabled); } public static ApiVersionCollection filterApis( @@ -249,35 +282,24 @@ public static ApiVersionCollection intersectForwardableApis( return apiKeys; } - private static ApiVersionsResponseData createApiVersionsResponseData( - final int throttleTimeMs, - final Errors error, - final ApiVersionCollection apiKeys, - final Features latestSupportedFeatures, - final Map finalizedFeatures, - final long finalizedFeaturesEpoch, - final boolean zkMigrationEnabled + private static SupportedFeatureKeyCollection maybeFilterSupportedFeatureKeys( + Features latestSupportedFeatures, + boolean alterV0 ) { - final ApiVersionsResponseData data = new ApiVersionsResponseData(); - data.setThrottleTimeMs(throttleTimeMs); - data.setErrorCode(error.code()); - data.setApiKeys(apiKeys); - data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures)); - data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures)); - data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch); - data.setZkMigrationReady(zkMigrationEnabled); - - return data; - } - - private static SupportedFeatureKeyCollection createSupportedFeatureKeys( - Features latestSupportedFeatures) { SupportedFeatureKeyCollection converted = new SupportedFeatureKeyCollection(); for (Map.Entry feature : latestSupportedFeatures.features().entrySet()) { - final SupportedFeatureKey key = new SupportedFeatureKey(); final SupportedVersionRange versionRange = feature.getValue(); + final SupportedFeatureKey key = new SupportedFeatureKey(); key.setName(feature.getKey()); - key.setMinVersion(versionRange.min()); + if (alterV0 && versionRange.min() == 0) { + // Some older clients will have deserialization problems if a feature's + // minimum supported level is 0. Therefore, when preparing ApiVersionResponse + // at versions less than 4, we must set the minimum version for these features + // to 1 rather than 0. See KAFKA-17011 for details. + key.setMinVersion((short) 1); + } else { + key.setMinVersion(versionRange.min()); + } key.setMaxVersion(versionRange.max()); converted.add(key); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java index 18d6a070d0557..a75616eb7160b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.Errors; import java.nio.ByteBuffer; +import java.util.Iterator; public class BrokerRegistrationRequest extends AbstractRequest { @@ -45,7 +46,21 @@ public short oldestAllowedVersion() { @Override public BrokerRegistrationRequest build(short version) { - return new BrokerRegistrationRequest(data, version); + if (version < 4) { + // Workaround for KAFKA-17011: for BrokerRegistrationRequest versions older than 4, + // translate minSupportedVersion = 0 to minSupportedVersion = 1. + BrokerRegistrationRequestData newData = data.duplicate(); + for (Iterator iter = newData.features().iterator(); + iter.hasNext(); ) { + BrokerRegistrationRequestData.Feature feature = iter.next(); + if (feature.minSupportedVersion() == 0) { + feature.setMinSupportedVersion((short) 1); + } + } + return new BrokerRegistrationRequest(newData, version); + } else { + return new BrokerRegistrationRequest(data, version); + } } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index d44a0a64296a1..43c6ed683d963 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -82,7 +82,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.function.Supplier; +import java.util.function.Function; import javax.net.ssl.SSLSession; import javax.security.auth.Subject; @@ -129,7 +129,7 @@ private enum SaslState { private final Time time; private final ReauthInfo reauthInfo; private final ChannelMetadataRegistry metadataRegistry; - private final Supplier apiVersionSupplier; + private final Function apiVersionSupplier; // Current SASL state private SaslState saslState = SaslState.INITIAL_REQUEST; @@ -159,7 +159,7 @@ public SaslServerAuthenticator(Map configs, Map connectionsMaxReauthMsByMechanism, ChannelMetadataRegistry metadataRegistry, Time time, - Supplier apiVersionSupplier) { + Function apiVersionSupplier) { this.callbackHandlers = callbackHandlers; this.connectionId = connectionId; this.subjects = subjects; @@ -596,7 +596,7 @@ else if (!apiVersionsRequest.isValid()) else { metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data().clientSoftwareName(), apiVersionsRequest.data().clientSoftwareVersion())); - sendKafkaResponse(context, apiVersionSupplier.get()); + sendKafkaResponse(context, apiVersionSupplier.apply(apiVersionsRequest.version())); setSaslState(SaslState.HANDSHAKE_REQUEST); } } diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json b/clients/src/main/resources/common/message/ApiVersionsRequest.json index b86edbfaaec12..050dbcfd3f2c9 100644 --- a/clients/src/main/resources/common/message/ApiVersionsRequest.json +++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json @@ -21,7 +21,9 @@ // Versions 0 through 2 of ApiVersionsRequest are the same. // // Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion. - "validVersions": "0-3", + // + // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ { "name": "ClientSoftwareName", "type": "string", "versions": "3+", diff --git a/clients/src/main/resources/common/message/ApiVersionsResponse.json b/clients/src/main/resources/common/message/ApiVersionsResponse.json index 9fda953e10e57..570891dd3cc83 100644 --- a/clients/src/main/resources/common/message/ApiVersionsResponse.json +++ b/clients/src/main/resources/common/message/ApiVersionsResponse.json @@ -27,7 +27,9 @@ // // Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned. - "validVersions": "0-3", + // + // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", @@ -45,7 +47,7 @@ "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "SupportedFeatures", "type": "[]SupportedFeatureKey", "ignorable": true, "versions": "3+", "tag": 0, "taggedVersions": "3+", - "about": "Features supported by the broker.", + "about": "Features supported by the broker. Note: in v0-v3, features with MinSupportedVersion = 0 show up with MinSupportedVersion = 1.", "fields": [ { "name": "Name", "type": "string", "versions": "3+", "mapKey": true, "about": "The name of the feature." }, diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json index 9343465ec8276..294e8549db713 100644 --- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json +++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json @@ -18,12 +18,15 @@ // Version 2 adds LogDirs for KIP-858 // Version 3 adds the PreviousBrokerEpoch for the KIP-966 + +// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0. + { "apiKey":62, "type": "request", "listeners": ["controller"], "name": "BrokerRegistrationRequest", - "validVersions": "0-3", + "validVersions": "0-4", "flexibleVersions": "0+", "fields": [ { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", @@ -45,7 +48,7 @@ ] }, { "name": "Features", "type": "[]Feature", - "about": "The features on this broker", "versions": "0+", "fields": [ + "about": "The features on this broker. Note: in v0-v3, features with MinSupportedVersion = 0 show up with MinSupportedVersion = 1.", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The feature name." }, { "name": "MinSupportedVersion", "type": "int16", "versions": "0+", diff --git a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json index 1f2ff94162f6c..dbc15cc400357 100644 --- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json +++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json @@ -20,7 +20,7 @@ "apiKey": 62, "type": "response", "name": "BrokerRegistrationResponse", - "validVersions": "0-3", + "validVersions": "0-4", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 18442a2f928ca..89f567157c3ad 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -358,7 +358,7 @@ public void testUnsupportedApiVersionsRequestWithVersionProvidedByTheBroker() { ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer(); RequestHeader header = parseHeader(buffer); assertEquals(ApiKeys.API_VERSIONS, header.apiKey()); - assertEquals(3, header.apiVersion()); + assertEquals(4, header.apiVersion()); // prepare response ApiVersionCollection apiKeys = new ApiVersionCollection(); @@ -430,7 +430,7 @@ public void testUnsupportedApiVersionsRequestWithoutVersionProvidedByTheBroker() ByteBuffer buffer = selector.completedSendBuffers().get(0).buffer(); RequestHeader header = parseHeader(buffer); assertEquals(ApiKeys.API_VERSIONS, header.apiKey()); - assertEquals(3, header.apiVersion()); + assertEquals(4, header.apiVersion()); // prepare response delayedApiVersionsResponse(0, (short) 0, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index f19462918e501..34c2722d46360 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -717,14 +717,16 @@ private static Features c private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures(Errors error) { if (error == Errors.NONE) { - return ApiVersionsResponse.createApiVersionsResponse( - 0, - ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER, false, false), - convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()), - Collections.singletonMap("test_feature_1", (short) 2), - defaultFeatureMetadata().finalizedFeaturesEpoch().get(), - false - ); + return new ApiVersionsResponse.Builder(). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER, false, false)). + setSupportedFeatures( + convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures())). + setFinalizedFeatures( + Collections.singletonMap("test_feature_1", (short) 2)). + setFinalizedFeaturesEpoch( + defaultFeatureMetadata().finalizedFeaturesEpoch().get()). + build(); } return new ApiVersionsResponse( new ApiVersionsResponseData() diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java index 512a98e5ccfc2..42424a9dacdd2 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java @@ -130,7 +130,7 @@ public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtoco if (channelBuilder == null) channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, config, credentialCache, tokenCache, time, logContext, - () -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER)); + version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER)); this.metrics = new Metrics(); this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time, "MetricGroup", channelBuilder, logContext); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java index f8ba8dc05b3e8..ad94ae1dcd9f7 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java @@ -48,7 +48,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.function.Supplier; +import java.util.function.Function; import javax.security.auth.Subject; import javax.security.auth.callback.CallbackHandler; @@ -179,8 +179,8 @@ protected GSSManager gssManager() { return channelBuilder; } - private Supplier defaultApiVersionsSupplier() { - return () -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); + private Function defaultApiVersionsSupplier() { + return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); } private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol, String saslMechanism) { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index c667fe5d453e5..8ad4dccc4f244 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -63,7 +63,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; +import java.util.function.Function; import java.util.stream.Stream; import javax.net.ssl.SSLContext; @@ -1352,8 +1352,8 @@ private interface FailureAction { void run() throws IOException; } - private Supplier defaultApiVersionsSupplier() { - return () -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); + private Function defaultApiVersionsSupplier() { + return version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER); } static class TestSslChannelBuilder extends SslChannelBuilder { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 65b7facd60209..af1dda65fc9fd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; import java.util.HashSet; @@ -116,18 +117,17 @@ public void shouldHaveCommonlyAgreedApiVersionResponseWithControllerOnForwardabl @Test public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() { - ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( - 10, - RecordVersion.V1, - Features.emptySupportedFeatures(), - Collections.emptyMap(), - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, - null, - ListenerType.ZK_BROKER, - true, - false, - true - ); + ApiVersionsResponse response = new ApiVersionsResponse.Builder(). + setThrottleTimeMs(10). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.V1, + ListenerType.ZK_BROKER, + true, + true)). + setSupportedFeatures(Features.emptySupportedFeatures()). + setFinalizedFeatures(Collections.emptyMap()). + setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). + build(); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); assertEquals(10, response.throttleTimeMs()); assertTrue(response.data().supportedFeatures().isEmpty()); @@ -137,19 +137,18 @@ public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() { @Test public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() { - ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( - 10, - RecordVersion.V1, - Features.supportedFeatures( - Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4)))), - Utils.mkMap(Utils.mkEntry("feature", (short) 3)), - 10L, - null, - ListenerType.ZK_BROKER, - true, - false, - true - ); + ApiVersionsResponse response = new ApiVersionsResponse.Builder(). + setThrottleTimeMs(10). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.V1, + ListenerType.ZK_BROKER, + true, + true)). + setSupportedFeatures(Features.supportedFeatures( + Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4))))). + setFinalizedFeatures(Utils.mkMap(Utils.mkEntry("feature", (short) 3))). + setFinalizedFeaturesEpoch(10L). + build(); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); assertEquals(10, response.throttleTimeMs()); @@ -169,18 +168,17 @@ public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefault @ParameterizedTest @EnumSource(names = {"ZK_BROKER", "BROKER"}) public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(ListenerType listenerType) { - ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( - AbstractResponse.DEFAULT_THROTTLE_TIME, - RecordVersion.current(), - Features.emptySupportedFeatures(), - Collections.emptyMap(), - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, - null, - listenerType, - true, - false, - true - ); + ApiVersionsResponse response = new ApiVersionsResponse.Builder(). + setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.current(), + listenerType, + true, + true)). + setSupportedFeatures(Features.emptySupportedFeatures()). + setFinalizedFeatures(Collections.emptyMap()). + setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). + build(); assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), apiKeysInResponse(response)); assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); assertTrue(response.data().supportedFeatures().isEmpty()); @@ -190,53 +188,49 @@ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThro @Test public void shouldCreateApiResponseWithTelemetryWhenEnabled() { - ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( - 10, - RecordVersion.V1, - Features.emptySupportedFeatures(), - Collections.emptyMap(), - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, - null, - ListenerType.BROKER, - true, - false, - true - ); + ApiVersionsResponse response = new ApiVersionsResponse.Builder(). + setThrottleTimeMs(10). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.V1, + ListenerType.BROKER, + true, + true)). + setSupportedFeatures(Features.emptySupportedFeatures()). + setFinalizedFeatures(Collections.emptyMap()). + setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). + build(); verifyApiKeysForTelemetry(response, 2); } @Test public void shouldNotCreateApiResponseWithTelemetryWhenDisabled() { - ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( - 10, - RecordVersion.V1, - Features.emptySupportedFeatures(), - Collections.emptyMap(), - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, - null, - ListenerType.BROKER, - true, - false, - false - ); + ApiVersionsResponse response = new ApiVersionsResponse.Builder(). + setThrottleTimeMs(10). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.V1, + ListenerType.BROKER, + true, + false)). + setSupportedFeatures(Features.emptySupportedFeatures()). + setFinalizedFeatures(Collections.emptyMap()). + setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). + build(); verifyApiKeysForTelemetry(response, 0); } @Test public void testMetadataQuorumApisAreDisabled() { - ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( - AbstractResponse.DEFAULT_THROTTLE_TIME, - RecordVersion.current(), - Features.emptySupportedFeatures(), - Collections.emptyMap(), - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, - null, - ListenerType.ZK_BROKER, - true, - false, - true - ); - + ApiVersionsResponse response = new ApiVersionsResponse.Builder(). + setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.current(), + ListenerType.ZK_BROKER, + true, + true)). + setSupportedFeatures(Features.emptySupportedFeatures()). + setFinalizedFeatures(Collections.emptyMap()). + setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). + build(); // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them HashSet exposedApis = apiKeysInResponse(response); assertFalse(exposedApis.contains(ApiKeys.VOTE)); @@ -276,6 +270,38 @@ public void testIntersect() { assertEquals(expected, ApiVersionsResponse.intersect(other, thisVersion).get()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testAlterV0Features(boolean alterV0Features) { + Features supported = + Features.supportedFeatures(Collections.singletonMap("my.feature", + new SupportedVersionRange((short) 0, (short) 1))); + ApiVersionsResponse response = new ApiVersionsResponse.Builder(). + setApiVersions(ApiVersionsResponse.filterApis( + RecordVersion.current(), + ListenerType.BROKER, + true, + true)). + setSupportedFeatures(supported). + setFinalizedFeatures(Collections.emptyMap()). + setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). + setAlterFeatureLevel0(alterV0Features). + build(); + if (alterV0Features) { + assertEquals(new SupportedFeatureKey(). + setName("my.feature"). + setMinVersion((short) 1). + setMaxVersion((short) 1), + response.data().supportedFeatures().find("my.feature")); + } else { + assertEquals(new SupportedFeatureKey(). + setName("my.feature"). + setMinVersion((short) 0). + setMaxVersion((short) 1), + response.data().supportedFeatures().find("my.feature")); + } + } + private void verifyVersions(short forwardableAPIKey, short minVersion, short maxVersion, diff --git a/clients/src/test/java/org/apache/kafka/common/requests/BrokerRegistrationRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/BrokerRegistrationRequestTest.java index 5ba2856476def..14a6e3ca318aa 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/BrokerRegistrationRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/BrokerRegistrationRequestTest.java @@ -18,19 +18,24 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.BrokerRegistrationRequestData; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.stream.IntStream; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; class BrokerRegistrationRequestTest { private static Stream BrokerRegistrationRequestVersions() { @@ -38,6 +43,10 @@ private static Stream BrokerRegistrationRequestVersions() { BrokerRegistrationRequestData.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version)); } + private static Stream BrokerRegistrationRequestVersionsWithoutV0() { + return BrokerRegistrationRequestVersions().skip(1); + } + @ParameterizedTest @MethodSource("BrokerRegistrationRequestVersions") public void testBasicBuild(short version) { @@ -51,7 +60,88 @@ public void testBasicBuild(short version) { .setListeners(new BrokerRegistrationRequestData.ListenerCollection()) .setRack("a") .setPreviousBrokerEpoch(1L); - BrokerRegistrationRequest.Builder builder = new BrokerRegistrationRequest.Builder(data); + BrokerRegistrationRequestData data2 = readSerializedRequest(version, data); + + assertEquals(0, data2.brokerId(), "Unexpected broker ID in " + data2); + assertEquals("test", data2.clusterId(), "Unexpected cluster ID in " + data2); + assertEquals(incarnationId, data2.incarnationId(), "Unexpected incarnation ID in " + data2); + assertEquals("a", data2.rack(), "Unexpected rack in " + data2); + if (version >= 3) { + assertEquals(1, data2.previousBrokerEpoch(), "Unexpected previousBrokerEpoch in " + data2); + } else { + assertEquals(-1, data2.previousBrokerEpoch(), "Unexpected previousBrokerEpoch in " + data2); + } + } + + @Test + public void testV0SerializationFailsWithZkMigrationEnabled() { + assertThrows(UnsupportedVersionException.class, + () -> readSerializedRequest((short) 0, + new BrokerRegistrationRequestData(). + setIsMigratingZkBroker(true))); + } + + @ParameterizedTest + @MethodSource("BrokerRegistrationRequestVersionsWithoutV0") + public void testAlterFeaturesWithMinVersion0BeforeV4(short version) { + BrokerRegistrationRequestData data = readSerializedRequest(version, + new BrokerRegistrationRequestData(). + setBrokerId(1). + setIsMigratingZkBroker(true). + setClusterId("test"). + setRack(null). + setFeatures(new BrokerRegistrationRequestData.FeatureCollection( + Arrays.asList( + new BrokerRegistrationRequestData.Feature(). + setName("metadata.version"). + setMinSupportedVersion((short) 1). + setMaxSupportedVersion((short) 17), + new BrokerRegistrationRequestData.Feature(). + setName("kraft.version"). + setMinSupportedVersion((short) 0). + setMaxSupportedVersion((short) 1) + ).iterator())). + setIncarnationId(Uuid.fromString("EfIEKywJSaWl5yWDwlop1Q")). + setListeners(new BrokerRegistrationRequestData.ListenerCollection()). + setPreviousBrokerEpoch(1L)); + assertEquals(1, data.brokerId()); + assertNull(data.rack()); + assertEquals(Uuid.fromString("EfIEKywJSaWl5yWDwlop1Q"), data.incarnationId()); + if (version < 4) { + assertEquals(new BrokerRegistrationRequestData.FeatureCollection( + Arrays.asList( + new BrokerRegistrationRequestData.Feature(). + setName("metadata.version"). + setMinSupportedVersion((short) 1). + setMaxSupportedVersion((short) 17), + new BrokerRegistrationRequestData.Feature(). + setName("kraft.version"). + setMinSupportedVersion((short) 1). + setMaxSupportedVersion((short) 1)).iterator()), data.features()); + } else { + assertEquals(new BrokerRegistrationRequestData.FeatureCollection( + Arrays.asList( + new BrokerRegistrationRequestData.Feature(). + setName("metadata.version"). + setMinSupportedVersion((short) 1). + setMaxSupportedVersion((short) 17), + new BrokerRegistrationRequestData.Feature(). + setName("kraft.version"). + setMinSupportedVersion((short) 0). + setMaxSupportedVersion((short) 1)).iterator()), data.features()); + } + } + + static BrokerRegistrationRequestData readSerializedRequest( + short version, + BrokerRegistrationRequestData input + ) { + BrokerRegistrationRequest.Builder builder = new BrokerRegistrationRequest.Builder(input); + if (input.isMigratingZkBroker()) { + assertEquals((short) 1, builder.oldestAllowedVersion()); + } else { + assertEquals((short) 0, builder.oldestAllowedVersion()); + } BrokerRegistrationRequest request = builder.build(version); ObjectSerializationCache cache = new ObjectSerializationCache(); @@ -63,15 +153,7 @@ public void testBasicBuild(short version) { BrokerRegistrationRequestData data2 = new BrokerRegistrationRequestData(); buf.flip(); data2.read(byteBufferAccessor, version); - - assertEquals(0, data2.brokerId(), "Unexpected broker ID in " + data2); - assertEquals("test", data2.clusterId(), "Unexpected cluster ID in " + data2); - assertEquals(incarnationId, data2.incarnationId(), "Unexpected incarnation ID in " + data2); - assertEquals("a", data2.rack(), "Unexpected rack in " + data2); - if (version >= 3) { - assertEquals(1, data2.previousBrokerEpoch(), "Unexpected previousBrokerEpoch in " + data2); - } else { - assertEquals(-1, data2.previousBrokerEpoch(), "Unexpected previousBrokerEpoch in " + data2); - } + return data2; } + } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 883332f93e1dc..145e887a5be2d 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -121,7 +121,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1951,7 +1950,7 @@ private NioEchoServer startServerApiVersionsUnsupportedByClient(final SecurityPr if (isScram) ScramCredentialUtils.createCache(credentialCache, Collections.singletonList(saslMechanism)); - Supplier apiVersionSupplier = () -> { + Function apiVersionSupplier = version -> { ApiVersionCollection versionCollection = new ApiVersionCollection(2); versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); @@ -1980,7 +1979,7 @@ private NioEchoServer startServerWithoutSaslAuthenticateHeader(final SecurityPro if (isScram) ScramCredentialUtils.createCache(credentialCache, Collections.singletonList(saslMechanism)); - Supplier apiVersionSupplier = () -> { + Function apiVersionSupplier = version -> { ApiVersionsResponse defaultApiVersionResponse = TestUtils.defaultApiVersionsResponse( ApiMessageType.ListenerType.ZK_BROKER); ApiVersionCollection apiVersions = new ApiVersionCollection(); @@ -2577,7 +2576,7 @@ public AlternateSaslChannelBuilder(ConnectionMode connectionMode, Map TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER)); + version -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 9d41da5cf0fec..6778e3357f3a8 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -384,7 +384,7 @@ private SaslServerAuthenticator setupAuthenticator(Map configs, Trans return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null, new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, connectionsMaxReauthMsByMechanism, - metadataRegistry, time, () -> apiVersionsResponse); + metadataRegistry, time, version -> apiVersionsResponse); } } diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 44a571353222a..bf9e948845bec 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -698,12 +698,13 @@ public static ApiVersionsResponse createApiVersionsResponse( Features latestSupportedFeatures, boolean zkMigrationEnabled ) { - return ApiVersionsResponse.createApiVersionsResponse( - throttleTimeMs, - apiVersions, - latestSupportedFeatures, - Collections.emptyMap(), - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, - zkMigrationEnabled); + return new ApiVersionsResponse.Builder(). + setThrottleTimeMs(throttleTimeMs). + setApiVersions(apiVersions). + setSupportedFeatures(latestSupportedFeatures). + setFinalizedFeatures(Collections.emptyMap()). + setFinalizedFeaturesEpoch(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH). + setZkMigrationEnabled(zkMigrationEnabled). + build(); } } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 4352b26d95855..cfdcd0598afe9 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -974,7 +974,7 @@ private[kafka] class Processor( credentialProvider.tokenCache, time, logContext, - () => apiVersionManager.apiVersionResponse(throttleTimeMs = 0) + version => apiVersionManager.apiVersionResponse(throttleTimeMs = 0, version < 4) ) ) diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index 77ba7d69dee73..41576419031c2 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -33,7 +33,7 @@ trait ApiVersionManager { def listenerType: ListenerType def enabledApis: collection.Set[ApiKeys] - def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse + def apiVersionResponse(throttleTimeMs: Int, alterFeatureLevel0: Boolean): ApiVersionsResponse def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = { apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion) @@ -102,16 +102,20 @@ class SimpleApiVersionManager( private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) - override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = { + override def apiVersionResponse( + throttleTimeMs: Int, + alterFeatureLevel0: Boolean + ): ApiVersionsResponse = { val currentFeatures = features - ApiVersionsResponse.createApiVersionsResponse( - throttleTimeMs, - apiVersions, - brokerFeatures, - currentFeatures.finalizedFeatures(), - currentFeatures.finalizedFeaturesEpoch(), - zkMigrationEnabled - ) + new ApiVersionsResponse.Builder(). + setThrottleTimeMs(throttleTimeMs). + setApiVersions(apiVersions). + setSupportedFeatures(brokerFeatures). + setFinalizedFeatures(currentFeatures.finalizedFeatures()). + setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch()). + setZkMigrationEnabled(zkMigrationEnabled). + setAlterFeatureLevel0(alterFeatureLevel0). + build() } override def features: FinalizedFeatures = featuresProvider.apply() @@ -142,27 +146,39 @@ class DefaultApiVersionManager( val enabledApis: mutable.Set[ApiKeys] = ApiKeys.apisForListener(listenerType).asScala - override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = { - val supportedFeatures = brokerFeatures.supportedFeatures + override def apiVersionResponse( + throttleTimeMs: Int, + alterFeatureLevel0: Boolean + ): ApiVersionsResponse = { val finalizedFeatures = metadataCache.features() val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions) val clientTelemetryEnabled = clientMetricsManager match { case Some(manager) => manager.isTelemetryReceiverConfigured case None => false } - - ApiVersionsResponse.createApiVersionsResponse( - throttleTimeMs, - finalizedFeatures.metadataVersion().highestSupportedRecordVersion, - supportedFeatures, - finalizedFeatures.finalizedFeatures(), - finalizedFeatures.finalizedFeaturesEpoch(), - controllerApiVersions.orNull, - listenerType, - enableUnstableLastVersion, - zkMigrationEnabled, - clientTelemetryEnabled - ) + val apiVersions = if (controllerApiVersions.isDefined) { + ApiVersionsResponse.controllerApiVersions( + finalizedFeatures.metadataVersion().highestSupportedRecordVersion, + controllerApiVersions.get, + listenerType, + enableUnstableLastVersion, + clientTelemetryEnabled) + } else { + ApiVersionsResponse.brokerApiVersions( + finalizedFeatures.metadataVersion().highestSupportedRecordVersion, + listenerType, + enableUnstableLastVersion, + clientTelemetryEnabled) + } + new ApiVersionsResponse.Builder(). + setThrottleTimeMs(throttleTimeMs). + setApiVersions(apiVersions). + setSupportedFeatures(brokerFeatures.supportedFeatures). + setFinalizedFeatures(finalizedFeatures.finalizedFeatures()). + setFinalizedFeaturesEpoch(finalizedFeatures.finalizedFeaturesEpoch()). + setZkMigrationEnabled(zkMigrationEnabled). + setAlterFeatureLevel0(alterFeatureLevel0). + build() } override def features: FinalizedFeatures = metadataCache.features() diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala index e685448de2134..f900783e158cc 100644 --- a/core/src/main/scala/kafka/server/BrokerFeatures.scala +++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala @@ -45,7 +45,12 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte */ def defaultFinalizedFeatures: Map[String, Short] = { supportedFeatures.features.asScala.map { - case(name, versionRange) => (name, versionRange.max) + case(name, versionRange) => + if (name.equals("kraft.version")) { + (name, 0.toShort) + } else { + (name, versionRange.max) + } }.toMap } @@ -93,6 +98,9 @@ object BrokerFeatures extends Logging { feature.latestProduction })) } + if (unstableFeatureVersionsEnabled) { + features.put("kraft.version", new SupportedVersionRange(0, 1)) + } Features.supportedFeatures(features) } diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 12f882032e2a3..59d5d0bef1007 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -462,7 +462,7 @@ class ControllerApis( requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)) } else { requestHelper.sendResponseMaybeThrottle(request, - requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs)) + requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs, request.header.apiVersion() < 4)) } CompletableFuture.completedFuture[Unit](()) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f696a0faba732..5a439539807c6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1973,7 +1973,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!apiVersionRequest.isValid) { apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception) } else { - apiVersionManager.apiVersionResponse(requestThrottleMs) + apiVersionManager.apiVersionResponse(requestThrottleMs, request.header.apiVersion() < 4) } } requestHelper.sendResponseMaybeThrottle(request, createResponseCallback) diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala index 64119ab7ed4f7..e1130493fbf6e 100644 --- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala +++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala @@ -64,7 +64,7 @@ class TestRaftRequestHandler( } private def handleApiVersions(request: RequestChannel.Request): Unit = { - requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0), None) + requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0, request.header.apiVersion() < 4), None) } private def handleVote(request: RequestChannel.Request): Unit = { diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala index f2b7b0cbe08e7..65e70d96726a1 100644 --- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala +++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala @@ -264,7 +264,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values())) val channelBuilder = new SaslChannelBuilder(ConnectionMode.CLIENT, jaasContexts, securityProtocol, null, false, kafkaClientSaslMechanism, true, null, null, null, time, new LogContext(), - () => org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) { + _ => org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) { override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin] } channelBuilder.configure(config.values()) diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 30318c1ccd967..5c792d97f1fbf 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -69,8 +69,13 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) - assertEquals(1, apiVersionsResponse.data().supportedFeatures().size()) + assertEquals(2, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) + if (apiVersion < 4) { + assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion()) + } else { + assertEquals(0, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion()) + } assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion()) } val expectedApis = if (!cluster.isKRaftTest) { diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala index eeaaae1940c60..ae0dcb94a4f44 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala @@ -91,7 +91,7 @@ class ApiVersionManagerTest { enableUnstableLastVersion = true ) - val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) + val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false) val alterConfigVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.CREATE_TOPICS.id) assertNotNull(alterConfigVersion) assertEquals(controllerMinVersion, alterConfigVersion.minVersion) @@ -114,7 +114,7 @@ class ApiVersionManagerTest { assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion)) assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) - val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) + val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false) val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id) assertNull(envelopeVersion) } @@ -136,7 +136,7 @@ class ApiVersionManagerTest { assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion)) assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) - val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) + val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false) val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id) assertNotNull(envelopeVersion) assertEquals(ApiKeys.ENVELOPE.oldestVersion, envelopeVersion.minVersion) @@ -155,7 +155,7 @@ class ApiVersionManagerTest { assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion)) assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) - val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) + val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false) assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)) } } diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsResponseIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsResponseIntegrationTest.scala new file mode 100644 index 0000000000000..a5a7564d4912a --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsResponseIntegrationTest.scala @@ -0,0 +1,75 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package kafka.server + +import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKeyCollection +import org.apache.kafka.common.requests.ApiVersionsRequest +import org.apache.kafka.common.requests.ApiVersionsResponse +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.server.config.ServerConfigs +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Properties + +class ApiVersionsResponseIntegrationTest extends BaseRequestTest { + override def brokerCount: Int = 1 + + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "false") + properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") + properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1") + } + + private def sendApiVersionsRequest(version: Short): ApiVersionsResponse = { + val request = new ApiVersionsRequest.Builder().build(version) + connectAndReceive[ApiVersionsResponse](request) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testSendV3ApiVersionsRequest(quorum: String): Unit = { + val response = sendApiVersionsRequest(3) + if (quorum.equals("kraft")) { + assertFeatureHasMinVersion("metadata.version", response.data().supportedFeatures(), 1) + assertFeatureHasMinVersion("kraft.version", response.data().supportedFeatures(), 1) + } else { + assertEquals(0, response.data().supportedFeatures().size()) + } + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testSendV4ApiVersionsRequest(quorum: String): Unit = { + val response = sendApiVersionsRequest(4) + if (quorum.equals("kraft")) { + assertFeatureHasMinVersion("metadata.version", response.data().supportedFeatures(), 1) + assertFeatureHasMinVersion("kraft.version", response.data().supportedFeatures(), 0) + } else { + assertEquals(0, response.data().supportedFeatures().size()) + } + } + + def assertFeatureHasMinVersion( + name: String, + coll: SupportedFeatureKeyCollection, + expectedMinVersion: Short + ): Unit = { + val key = coll.find(name) + assertNotNull(key) + assertEquals(name, key.name()) + assertEquals(expectedMinVersion, key.minVersion()) + } +} diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala index 78432b7110580..94412b3582c41 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala @@ -95,6 +95,7 @@ class BrokerFeaturesTest { val expectedFeatures = Map[String, Short]( MetadataVersion.FEATURE_NAME -> MetadataVersion.latestTesting().featureLevel(), + "kraft.version" -> 0, "test_feature_1" -> 4, "test_feature_2" -> 3, "test_feature_3" -> 7) diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index eb7819137cc8c..4b6671dbd90a4 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -47,8 +47,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -@ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") +@ExtendWith(value = ClusterTestExtensions.class) public class FeatureCommandTest { @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testDescribeWithZK(ClusterInstance cluster) { @@ -67,8 +67,10 @@ public void testDescribeWithKRaft(ClusterInstance cluster) { List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) + assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(0))); + "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(1))); } // Use the first MetadataVersion that supports KIP-919 @@ -81,8 +83,10 @@ public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster List features = Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList()); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) + assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0))); assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(0))); + "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(1))); } @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)