diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index a6060586e05f3..54ba2063499be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.NetworkSend; @@ -905,7 +905,7 @@ private void handleApiVersionsResponse(List responses, // If not provided, the client falls back to version 0. short maxApiVersion = 0; if (apiVersionsResponse.data().apiKeys().size() > 0) { - ApiVersionsResponseKey apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id); + ApiVersion apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id); if (apiVersion != null) { maxApiVersion = apiVersion.maxVersion(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index 30351c64b4a3f..7588dee312931 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -20,10 +20,10 @@ import java.util.Collections; import java.util.LinkedList; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiVersion; +import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; @@ -50,7 +50,7 @@ public class NodeApiVersions { * @return A new NodeApiVersions object. */ public static NodeApiVersions create() { - return create(Collections.emptyList()); + return create(Collections.emptyList()); } /** @@ -65,14 +65,12 @@ public static NodeApiVersions create(Collection overrides) { for (ApiKeys apiKey : ApiKeys.enabledApis()) { boolean exists = false; for (ApiVersion apiVersion : apiVersions) { - if (apiVersion.apiKey == apiKey.id) { + if (apiVersion.apiKey() == apiKey.id) { exists = true; break; } } - if (!exists) { - apiVersions.add(new ApiVersion(apiKey)); - } + if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey)); } return new NodeApiVersions(apiVersions); } @@ -87,25 +85,28 @@ public static NodeApiVersions create(Collection overrides) { * @return A new NodeApiVersions object. */ public static NodeApiVersions create(short apiKey, short minVersion, short maxVersion) { - return create(Collections.singleton(new ApiVersion(apiKey, minVersion, maxVersion))); + return create(Collections.singleton(new ApiVersion() + .setApiKey(apiKey) + .setMinVersion(minVersion) + .setMaxVersion(maxVersion))); } - public NodeApiVersions(ApiVersionsResponseKeyCollection nodeApiVersions) { - for (ApiVersionsResponseKey nodeApiVersion : nodeApiVersions) { + public NodeApiVersions(ApiVersionCollection nodeApiVersions) { + for (ApiVersion nodeApiVersion : nodeApiVersions) { if (ApiKeys.hasId(nodeApiVersion.apiKey())) { ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey()); - supportedVersions.put(nodeApiKey, new ApiVersion(nodeApiVersion)); + supportedVersions.put(nodeApiKey, nodeApiVersion); } else { // Newer brokers may support ApiKeys we don't know about - unknownApis.add(new ApiVersion(nodeApiVersion)); + unknownApis.add(nodeApiVersion); } } } public NodeApiVersions(Collection nodeApiVersions) { for (ApiVersion nodeApiVersion : nodeApiVersions) { - if (ApiKeys.hasId(nodeApiVersion.apiKey)) { - ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey); + if (ApiKeys.hasId(nodeApiVersion.apiKey())) { + ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey()); supportedVersions.put(nodeApiKey, nodeApiVersion); } else { // Newer brokers may support ApiKeys we don't know about @@ -128,15 +129,18 @@ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, sho if (!supportedVersions.containsKey(apiKey)) throw new UnsupportedVersionException("The broker does not support " + apiKey); ApiVersion supportedVersion = supportedVersions.get(apiKey); - Optional intersectVersion = supportedVersion.intersect( - new ApiVersion(apiKey.id, oldestAllowedVersion, latestAllowedVersion)); + Optional intersectVersion = ApiVersionsResponse.intersect(supportedVersion, + new ApiVersion() + .setApiKey(apiKey.id) + .setMinVersion(oldestAllowedVersion) + .setMaxVersion(latestAllowedVersion)); if (intersectVersion.isPresent()) - return intersectVersion.get().maxVersion; + return intersectVersion.get().maxVersion(); else throw new UnsupportedVersionException("The broker does not support " + apiKey + " with version in range [" + oldestAllowedVersion + "," + latestAllowedVersion + "]. The supported" + - " range is [" + supportedVersion.minVersion + "," + supportedVersion.maxVersion + "]."); + " range is [" + supportedVersion.minVersion() + "," + supportedVersion.maxVersion() + "]."); } /** @@ -160,9 +164,9 @@ public String toString(boolean lineBreaks) { // ascending order. TreeMap apiKeysText = new TreeMap<>(); for (ApiVersion supportedVersion : this.supportedVersions.values()) - apiKeysText.put(supportedVersion.apiKey, apiVersionToText(supportedVersion)); + apiKeysText.put(supportedVersion.apiKey(), apiVersionToText(supportedVersion)); for (ApiVersion apiVersion : unknownApis) - apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion)); + apiKeysText.put(apiVersion.apiKey(), apiVersionToText(apiVersion)); // Also handle the case where some apiKey types are not specified at all in the given ApiVersions, // which may happen when the remote is too old. @@ -189,27 +193,27 @@ public String toString(boolean lineBreaks) { private String apiVersionToText(ApiVersion apiVersion) { StringBuilder bld = new StringBuilder(); ApiKeys apiKey = null; - if (ApiKeys.hasId(apiVersion.apiKey)) { - apiKey = ApiKeys.forId(apiVersion.apiKey); + if (ApiKeys.hasId(apiVersion.apiKey())) { + apiKey = ApiKeys.forId(apiVersion.apiKey()); bld.append(apiKey.name).append("(").append(apiKey.id).append("): "); } else { - bld.append("UNKNOWN(").append(apiVersion.apiKey).append("): "); + bld.append("UNKNOWN(").append(apiVersion.apiKey()).append("): "); } - if (apiVersion.minVersion == apiVersion.maxVersion) { - bld.append(apiVersion.minVersion); + if (apiVersion.minVersion() == apiVersion.maxVersion()) { + bld.append(apiVersion.minVersion()); } else { - bld.append(apiVersion.minVersion).append(" to ").append(apiVersion.maxVersion); + bld.append(apiVersion.minVersion()).append(" to ").append(apiVersion.maxVersion()); } if (apiKey != null) { ApiVersion supportedVersion = supportedVersions.get(apiKey); - if (apiKey.latestVersion() < supportedVersion.minVersion) { + if (apiKey.latestVersion() < supportedVersion.minVersion()) { bld.append(" [unusable: node too new]"); - } else if (supportedVersion.maxVersion < apiKey.oldestVersion()) { + } else if (supportedVersion.maxVersion() < apiKey.oldestVersion()) { bld.append(" [unusable: node too old]"); } else { - short latestUsableVersion = Utils.min(apiKey.latestVersion(), supportedVersion.maxVersion); + short latestUsableVersion = Utils.min(apiKey.latestVersion(), supportedVersion.maxVersion()); bld.append(" [usable: ").append(latestUsableVersion).append("]"); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 14615cb020fd0..6488294d4c541 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; @@ -48,6 +47,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; @@ -774,7 +774,7 @@ static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersi if (apiVersion == null) return false; - return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion); + return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 7f9b5a9b10865..47c6684f37c03 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.producer.internals; -import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.NodeApiVersions; @@ -28,6 +27,7 @@ import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownProducerIdException; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.common.KafkaException; @@ -1051,7 +1051,7 @@ void handleCoordinatorReady() { nodeApiVersions.apiVersion(ApiKeys.INIT_PRODUCER_ID) : null; this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null && - initProducerIdVersion.maxVersion >= 3; + initProducerIdVersion.maxVersion() >= 3; } private void transitionTo(State target) { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java deleted file mode 100644 index b1e557b140f14..0000000000000 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.protocol; - -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; - -import java.util.Optional; - -/** - * Represents the min version and max version of an api key. - * - * NOTE: This class is intended for INTERNAL usage only within Kafka. - */ -public class ApiVersion { - public final short apiKey; - public final short minVersion; - public final short maxVersion; - - public ApiVersion(ApiKeys apiKey) { - this(apiKey.id, apiKey.oldestVersion(), apiKey.latestVersion()); - } - - public ApiVersion(short apiKey, short minVersion, short maxVersion) { - this.apiKey = apiKey; - this.minVersion = minVersion; - this.maxVersion = maxVersion; - } - - public ApiVersion(ApiVersionsResponseKey apiVersionsResponseKey) { - this.apiKey = apiVersionsResponseKey.apiKey(); - this.minVersion = apiVersionsResponseKey.minVersion(); - this.maxVersion = apiVersionsResponseKey.maxVersion(); - } - - @Override - public String toString() { - return "ApiVersion(" + - "apiKey=" + apiKey + - ", minVersion=" + minVersion + - ", maxVersion= " + maxVersion + - ")"; - } - - public Optional intersect(ApiVersion other) { - if (other == null) { - return Optional.empty(); - } - short minVersion = (short) Math.max(this.minVersion, other.minVersion); - short maxVersion = (short) Math.min(this.maxVersion, other.maxVersion); - return minVersion > maxVersion ? Optional.empty() : - Optional.of(new ApiVersion(apiKey, minVersion, maxVersion)); - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 946a9d9da7bed..4ae819ceb10b6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -19,8 +19,7 @@ import java.util.regex.Pattern; import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; @@ -108,11 +107,8 @@ public ApiVersionsResponse getErrorResponse(int throttleTimeMs, Throwable e) { // Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned. if (Errors.forException(e) == Errors.UNSUPPORTED_VERSION) { - ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection(); - apiKeys.add(new ApiVersionsResponseKey() - .setApiKey(ApiKeys.API_VERSIONS.id) - .setMinVersion(ApiKeys.API_VERSIONS.oldestVersion()) - .setMaxVersion(ApiKeys.API_VERSIONS.latestVersion())); + ApiVersionCollection apiKeys = new ApiVersionCollection(); + apiKeys.add(ApiVersionsResponse.toApiVersion(ApiKeys.API_VERSIONS)); data.setApiKeys(apiKeys); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 1261c5247c46f..3a6ff2fe9c6a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -16,13 +16,12 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.common.feature.Features; import org.apache.kafka.common.feature.FinalizedVersionRange; import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.ApiVersionsResponseData; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey; import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKeyCollection; import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey; @@ -60,7 +59,7 @@ public ApiVersionsResponseData data() { return data; } - public ApiVersionsResponseKey apiVersion(short apiKey) { + public ApiVersion apiVersion(short apiKey) { return data.apiKeys().find(apiKey); } @@ -118,14 +117,11 @@ private static ApiVersionsResponse createApiVersionsResponse( finalizedFeaturesEpoch)); } - public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagic) { - ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection(); + public static ApiVersionCollection defaultApiKeys(final byte minMagic) { + ApiVersionCollection apiKeys = new ApiVersionCollection(); for (ApiKeys apiKey : ApiKeys.enabledApis()) { if (apiKey.minRequiredInterBrokerMagic <= minMagic) { - apiKeys.add(new ApiVersionsResponseKey() - .setApiKey(apiKey.id) - .setMinVersion(apiKey.oldestVersion()) - .setMaxVersion(apiKey.latestVersion())); + apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey)); } } return apiKeys; @@ -138,22 +134,18 @@ public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagi * @param activeControllerApiVersions controller ApiVersions * @return commonly agreed ApiVersion collection */ - public static ApiVersionsResponseKeyCollection intersectControllerApiVersions(final byte minMagic, - final Map activeControllerApiVersions) { - ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection(); + public static ApiVersionCollection intersectControllerApiVersions(final byte minMagic, + final Map activeControllerApiVersions) { + ApiVersionCollection apiKeys = new ApiVersionCollection(); for (ApiKeys apiKey : ApiKeys.enabledApis()) { if (apiKey.minRequiredInterBrokerMagic <= minMagic) { - ApiVersion brokerApiVersion = new ApiVersion( - apiKey.id, - apiKey.oldestVersion(), - apiKey.latestVersion() - ); + ApiVersion brokerApiVersion = toApiVersion(apiKey); final ApiVersion finalApiVersion; if (!apiKey.forwardable) { finalApiVersion = brokerApiVersion; } else { - Optional intersectVersion = brokerApiVersion.intersect( + Optional intersectVersion = intersect(brokerApiVersion, activeControllerApiVersions.getOrDefault(apiKey, null)); if (intersectVersion.isPresent()) { finalApiVersion = intersectVersion.get(); @@ -163,10 +155,7 @@ public static ApiVersionsResponseKeyCollection intersectControllerApiVersions(fi } } - apiKeys.add(new ApiVersionsResponseKey() - .setApiKey(finalApiVersion.apiKey) - .setMinVersion(finalApiVersion.minVersion) - .setMaxVersion(finalApiVersion.maxVersion)); + apiKeys.add(finalApiVersion.duplicate()); } } return apiKeys; @@ -175,7 +164,7 @@ public static ApiVersionsResponseKeyCollection intersectControllerApiVersions(fi public static ApiVersionsResponseData createApiVersionsResponseData( final int throttleTimeMs, final Errors error, - final ApiVersionsResponseKeyCollection apiKeys, + final ApiVersionCollection apiKeys, final Features latestSupportedFeatures, final Features finalizedFeatures, final long finalizedFeaturesEpoch @@ -220,4 +209,27 @@ private static FinalizedFeatureKeyCollection createFinalizedFeatureKeys( return converted; } + + public static Optional intersect(ApiVersion thisVersion, + ApiVersion other) { + if (thisVersion == null || other == null) return Optional.empty(); + if (thisVersion.apiKey() != other.apiKey()) + throw new IllegalArgumentException("thisVersion.apiKey: " + thisVersion.apiKey() + + " must be equal to other.apiKey: " + other.apiKey()); + short minVersion = (short) Math.max(thisVersion.minVersion(), other.minVersion()); + short maxVersion = (short) Math.min(thisVersion.maxVersion(), other.maxVersion()); + return minVersion > maxVersion + ? Optional.empty() + : Optional.of(new ApiVersion() + .setApiKey(thisVersion.apiKey()) + .setMinVersion(minVersion) + .setMaxVersion(maxVersion)); + } + + public static ApiVersion toApiVersion(ApiKeys apiKey) { + return new ApiVersion() + .setApiKey(apiKey.id) + .setMinVersion(apiKey.oldestVersion()) + .setMaxVersion(apiKey.latestVersion()); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 916585017a791..e502f80d5c05f 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.RequestHeaderData; import org.apache.kafka.common.message.SaslAuthenticateRequestData; import org.apache.kafka.common.message.SaslHandshakeRequestData; @@ -390,12 +390,12 @@ protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { // Visible to override for testing protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse apiVersionsResponse) { - ApiVersionsResponseKey authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id); + ApiVersion authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id); if (authenticateVersion != null) { this.saslAuthenticateVersion = (short) Math.min(authenticateVersion.maxVersion(), ApiKeys.SASL_AUTHENTICATE.latestVersion()); } - ApiVersionsResponseKey handshakeVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id); + ApiVersion handshakeVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id); if (handshakeVersion != null) { this.saslHandshakeVersion = (short) Math.min(handshakeVersion.maxVersion(), ApiKeys.SASL_HANDSHAKE.latestVersion()); diff --git a/clients/src/main/resources/common/message/ApiVersionsResponse.json b/clients/src/main/resources/common/message/ApiVersionsResponse.json index ba6f01cb9434a..31d2dc3eb6c58 100644 --- a/clients/src/main/resources/common/message/ApiVersionsResponse.json +++ b/clients/src/main/resources/common/message/ApiVersionsResponse.json @@ -32,7 +32,7 @@ "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code." }, - { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+", + { "name": "ApiKeys", "type": "[]ApiVersion", "versions": "0+", "about": "The APIs supported by the broker.", "fields": [ { "name": "ApiKey", "type": "int16", "versions": "0+", "mapKey": true, "about": "The API index." }, diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 342b2eb2fe469..52e74d4b86b80 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -23,8 +23,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.message.ApiVersionsResponseData; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.network.NetworkReceive; @@ -304,8 +304,8 @@ public void testUnsupportedApiVersionsRequestWithVersionProvidedByTheBroker() { assertEquals(3, header.apiVersion()); // prepare response - ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection(); - apiKeys.add(new ApiVersionsResponseKey() + ApiVersionCollection apiKeys = new ApiVersionCollection(); + apiKeys.add(new ApiVersion() .setApiKey(ApiKeys.API_VERSIONS.id) .setMinVersion((short) 0) .setMaxVersion((short) 2)); @@ -526,19 +526,14 @@ public void testConnectionThrottling() { // Creates expected ApiVersionsResponse from the specified node, where the max protocol version for the specified // key is set to the specified version. private ApiVersionsResponse createExpectedApiVersionsResponse(ApiKeys key, short maxVersion) { - ApiVersionsResponseKeyCollection versionList = new ApiVersionsResponseKeyCollection(); + ApiVersionCollection versionList = new ApiVersionCollection(); for (ApiKeys apiKey : ApiKeys.values()) { if (apiKey == key) { - versionList.add(new ApiVersionsResponseKey() + versionList.add(new ApiVersion() .setApiKey(apiKey.id) .setMinVersion((short) 0) .setMaxVersion(maxVersion)); - } else { - versionList.add(new ApiVersionsResponseKey() - .setApiKey(apiKey.id) - .setMinVersion(apiKey.oldestVersion()) - .setMaxVersion(apiKey.latestVersion())); - } + } else versionList.add(ApiVersionsResponse.toApiVersion(apiKey)); } return new ApiVersionsResponse(new ApiVersionsResponseData() .setErrorCode(Errors.NONE.code()) diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index 1a44badd0730c..01d8cf6bdc434 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -20,10 +20,9 @@ import java.util.LinkedList; import java.util.List; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.junit.jupiter.api.Test; @@ -36,7 +35,7 @@ public class NodeApiVersionsTest { @Test public void testUnsupportedVersionsToString() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionsResponseKeyCollection()); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection()); StringBuilder bld = new StringBuilder(); String prefix = "("; for (ApiKeys apiKey : ApiKeys.enabledApis()) { @@ -59,10 +58,11 @@ public void testVersionsToString() { List versionList = new ArrayList<>(); for (ApiKeys apiKey : ApiKeys.values()) { if (apiKey == ApiKeys.DELETE_TOPICS) { - versionList.add(new ApiVersion(apiKey.id, (short) 10000, (short) 10001)); - } else { - versionList.add(new ApiVersion(apiKey)); - } + versionList.add(new ApiVersion() + .setApiKey(apiKey.id) + .setMinVersion((short) 10000) + .setMaxVersion((short) 10001)); + } else versionList.add(ApiVersionsResponse.toApiVersion(apiKey)); } NodeApiVersions versions = new NodeApiVersions(versionList); StringBuilder bld = new StringBuilder(); @@ -121,7 +121,7 @@ public void testLatestUsableVersionOutOfRangeHigh() { @Test public void testUsableVersionCalculationNoKnownVersions() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionsResponseKeyCollection()); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection()); assertThrows(UnsupportedVersionException.class, () -> versions.latestUsableVersion(ApiKeys.FETCH)); } @@ -135,12 +135,12 @@ public void testLatestUsableVersionOutOfRange() { @Test public void testUsableVersionLatestVersions() { - List versionList = new LinkedList<>(); - for (ApiVersionsResponseKey apiVersion: ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys()) { - versionList.add(new ApiVersion(apiVersion)); - } + List versionList = new LinkedList<>(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys()); // Add an API key that we don't know about. - versionList.add(new ApiVersion((short) 100, (short) 0, (short) 1)); + versionList.add(new ApiVersion() + .setApiKey((short) 100) + .setMinVersion((short) 0) + .setMaxVersion((short) 1)); NodeApiVersions versions = new NodeApiVersions(versionList); for (ApiKeys apiKey: ApiKeys.values()) { if (apiKey.isEnabled) { @@ -156,11 +156,11 @@ public void testConstructionFromApiVersionsResponse() { ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE; NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys()); - for (ApiVersionsResponseKey apiVersionKey : apiVersionsResponse.data().apiKeys()) { + for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) { ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey())); - assertEquals(apiVersionKey.apiKey(), apiVersion.apiKey); - assertEquals(apiVersionKey.minVersion(), apiVersion.minVersion); - assertEquals(apiVersionKey.maxVersion(), apiVersion.maxVersion); + assertEquals(apiVersionKey.apiKey(), apiVersion.apiKey()); + assertEquals(apiVersionKey.minVersion(), apiVersion.minVersion()); + assertEquals(apiVersionKey.maxVersion(), apiVersion.maxVersion()); } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index a9dd1711c48c6..b7a24aaa46a66 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.admin; -import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.MockClient; @@ -75,6 +74,7 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.message.CreateAclsResponseData; import org.apache.kafka.common.message.CreatePartitionsResponseData; import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult; @@ -2215,7 +2215,10 @@ public void testListConsumerGroupsWithStates() throws Exception { @Test public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception { - ApiVersion listGroupV3 = new ApiVersion(ApiKeys.LIST_GROUPS.id, (short) 0, (short) 3); + ApiVersion listGroupV3 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV3))); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index edfb3747dc06a..1550567864633 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.producer.internals; -import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.CommitFailedException; @@ -24,6 +23,7 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.utils.ProducerIdAndEpoch; @@ -153,8 +153,14 @@ private void initializeTransactionManager(Optional transactionalId, bool Metrics metrics = new Metrics(time); apiVersions.update("0", new NodeApiVersions(Arrays.asList( - new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3), - new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7)))); + new ApiVersion() + .setApiKey(ApiKeys.INIT_PRODUCER_ID.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3), + new ApiVersion() + .setApiKey(ApiKeys.PRODUCE.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 7)))); this.transactionManager = new TransactionManager(logContext, transactionalId.orElse(null), transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, autoDowngrade); @@ -2536,8 +2542,14 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException { @Test public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException { apiVersions.update("0", new NodeApiVersions(Arrays.asList( - new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 1), - new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7)))); + new ApiVersion() + .setApiKey(ApiKeys.INIT_PRODUCER_ID.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 1), + new ApiVersion() + .setApiKey(ApiKeys.PRODUCE.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 7)))); doInitTransactions(); @@ -2730,8 +2742,14 @@ public void testNoFailedBatchHandlingWhenTxnManagerIsInFatalError() { @Test public void testAbortTransactionAndReuseSequenceNumberOnError() throws InterruptedException { apiVersions.update("0", new NodeApiVersions(Arrays.asList( - new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 1), - new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7)))); + new ApiVersion() + .setApiKey(ApiKeys.INIT_PRODUCER_ID.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 1), + new ApiVersion() + .setApiKey(ApiKeys.PRODUCE.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 7)))); doInitTransactions(); @@ -2777,8 +2795,14 @@ public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() thro // where the sequence number is reset on an UnknownProducerId error, allowing subsequent transactions to // append to the log successfully apiVersions.update("0", new NodeApiVersions(Arrays.asList( - new ApiVersion(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 1), - new ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 7)))); + new ApiVersion() + .setApiKey(ApiKeys.INIT_PRODUCER_ID.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 1), + new ApiVersion() + .setApiKey(ApiKeys.PRODUCE.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 7)))); doInitTransactions(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 1a3f450036a77..f387a06a26068 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -17,9 +17,8 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.protocol.ApiVersion; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.Utils; @@ -31,8 +30,10 @@ import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class ApiVersionsResponseTest { @@ -47,11 +48,11 @@ public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() { @Test public void shouldHaveCorrectDefaultApiVersionsResponse() { - Collection apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(); + Collection apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(); assertEquals(apiVersions.size(), ApiKeys.enabledApis().size(), "API versions for all API keys must be maintained."); for (ApiKeys key : ApiKeys.enabledApis()) { - ApiVersionsResponseKey version = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id); + ApiVersion version = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id); assertNotNull(version, "Could not find ApiVersion for API " + key.name); assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name); assertEquals(version.maxVersion(), key.latestVersion(), "Incorrect max version for Api " + key.name); @@ -85,11 +86,17 @@ public void shouldHaveCommonlyAgreedApiVersionResponseWithControllerOnForwardabl final short minVersion = 0; final short maxVersion = 1; Map activeControllerApiVersions = Utils.mkMap( - Utils.mkEntry(forwardableAPIKey, new ApiVersion(forwardableAPIKey.id, minVersion, maxVersion)), - Utils.mkEntry(nonForwardableAPIKey, new ApiVersion(nonForwardableAPIKey.id, minVersion, maxVersion)) + Utils.mkEntry(forwardableAPIKey, new ApiVersion() + .setApiKey(forwardableAPIKey.id) + .setMinVersion(minVersion) + .setMaxVersion(maxVersion)), + Utils.mkEntry(nonForwardableAPIKey, new ApiVersion() + .setApiKey(nonForwardableAPIKey.id) + .setMinVersion(minVersion) + .setMaxVersion(maxVersion)) ); - ApiVersionsResponseKeyCollection commonResponse = ApiVersionsResponse.intersectControllerApiVersions( + ApiVersionCollection commonResponse = ApiVersionsResponse.intersectControllerApiVersions( RecordBatch.CURRENT_MAGIC_VALUE, activeControllerApiVersions); @@ -99,12 +106,43 @@ public void shouldHaveCommonlyAgreedApiVersionResponseWithControllerOnForwardabl ApiKeys.JOIN_GROUP.latestVersion(), commonResponse); } + @Test + public void testIntersect() { + assertFalse(ApiVersionsResponse.intersect(null, null).isPresent()); + assertThrows(IllegalArgumentException.class, + () -> ApiVersionsResponse.intersect(new ApiVersion().setApiKey((short) 10), new ApiVersion().setApiKey((short) 3))); + + short min = 0; + short max = 10; + ApiVersion thisVersion = new ApiVersion() + .setApiKey(ApiKeys.FETCH.id) + .setMinVersion(min) + .setMaxVersion(Short.MAX_VALUE); + + ApiVersion other = new ApiVersion() + .setApiKey(ApiKeys.FETCH.id) + .setMinVersion(Short.MIN_VALUE) + .setMaxVersion(max); + + ApiVersion expected = new ApiVersion() + .setApiKey(ApiKeys.FETCH.id) + .setMinVersion(min) + .setMaxVersion(max); + + assertFalse(ApiVersionsResponse.intersect(thisVersion, null).isPresent()); + assertFalse(ApiVersionsResponse.intersect(null, other).isPresent()); + + assertEquals(expected, ApiVersionsResponse.intersect(thisVersion, other).get()); + // test for symmetric + assertEquals(expected, ApiVersionsResponse.intersect(other, thisVersion).get()); + } + private void verifyVersions(short forwardableAPIKey, short minVersion, short maxVersion, - ApiVersionsResponseKeyCollection commonResponse) { - ApiVersionsResponseKey expectedVersionsForForwardableAPI = - new ApiVersionsResponseKey() + ApiVersionCollection commonResponse) { + ApiVersion expectedVersionsForForwardableAPI = + new ApiVersion() .setApiKey(forwardableAPIKey) .setMinVersion(minVersion) .setMaxVersion(maxVersion); @@ -113,7 +151,7 @@ private void verifyVersions(short forwardableAPIKey, private Set apiKeysInResponse(final ApiVersionsResponse apiVersions) { final Set apiKeys = new HashSet<>(); - for (final ApiVersionsResponseKey version : apiVersions.data().apiKeys()) { + for (final ApiVersion version : apiVersions.data().apiKeys()) { apiKeys.add(ApiKeys.forId(version.apiKey())); } return apiKeys; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java index da4e52e03030c..4415ff960aafb 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.ApiVersionsResponseData; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; @@ -60,7 +60,7 @@ public void testSerdeUnsupportedApiVersionRequest() throws Exception { Send send = context.buildResponseSend(new ApiVersionsResponse(new ApiVersionsResponseData() .setThrottleTimeMs(0) .setErrorCode(Errors.UNSUPPORTED_VERSION.code()) - .setApiKeys(new ApiVersionsResponseKeyCollection()))); + .setApiKeys(new ApiVersionCollection()))); ByteBufferChannel channel = new ByteBufferChannel(256); send.writeTo(channel); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 0461f0e074f7f..cd1f7b651bf1c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -46,8 +46,8 @@ import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.message.ControlledShutdownRequestData; import org.apache.kafka.common.message.ControlledShutdownResponseData; import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition; @@ -940,7 +940,7 @@ public void testApiVersionResponseWithUnsupportedError() { assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data().errorCode()); - ApiVersionsResponseKey apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id); + ApiVersion apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id); assertNotNull(apiVersion); assertEquals(ApiKeys.API_VERSIONS.id, apiVersion.apiKey()); assertEquals(ApiKeys.API_VERSIONS.oldestVersion(), apiVersion.minVersion()); @@ -1700,8 +1700,8 @@ private ApiVersionsRequest createApiVersionRequest() { } private ApiVersionsResponse createApiVersionResponse() { - ApiVersionsResponseKeyCollection apiVersions = new ApiVersionsResponseKeyCollection(); - apiVersions.add(new ApiVersionsResponseKey() + ApiVersionCollection apiVersions = new ApiVersionCollection(); + apiVersions.add(new ApiVersion() .setApiKey((short) 0) .setMinVersion((short) 0) .setMaxVersion((short) 2)); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 070bb738535e6..2caa926d1f4bb 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -58,8 +58,8 @@ import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; @@ -713,7 +713,7 @@ public void testApiVersionsRequestWithServerUnsupportedVersion() throws Exceptio ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer, (short) 0); assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data().errorCode()); - ApiVersionsResponseKey apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id); + ApiVersion apiVersion = response.data().apiKeys().find(ApiKeys.API_VERSIONS.id); assertNotNull(apiVersion); assertEquals(ApiKeys.API_VERSIONS.id, apiVersion.apiKey()); assertEquals(ApiKeys.API_VERSIONS.oldestVersion(), apiVersion.minVersion()); @@ -1844,9 +1844,9 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map config @Override protected ApiVersionsResponse apiVersionsResponse() { - ApiVersionsResponseKeyCollection versionCollection = new ApiVersionsResponseKeyCollection(2); - versionCollection.add(new ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); - versionCollection.add(new ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); + ApiVersionCollection versionCollection = new ApiVersionCollection(2); + versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); + versionCollection.add(new ApiVersion().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(versionCollection)); } }; @@ -1887,16 +1887,12 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map config @Override protected ApiVersionsResponse apiVersionsResponse() { ApiVersionsResponse defaultApiVersionResponse = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE; - ApiVersionsResponseKeyCollection apiVersions = new ApiVersionsResponseKeyCollection(); - for (ApiVersionsResponseKey apiVersion : defaultApiVersionResponse.data().apiKeys()) { + ApiVersionCollection apiVersions = new ApiVersionCollection(); + for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) { if (apiVersion.apiKey() != ApiKeys.SASL_AUTHENTICATE.id) { - // ApiVersionsResponseKey can NOT be reused in second ApiVersionsResponseKeyCollection + // ApiVersion can NOT be reused in second ApiVersionCollection // due to the internal pointers it contains. - apiVersions.add(new ApiVersionsResponseKey() - .setApiKey(apiVersion.apiKey()) - .setMinVersion(apiVersion.minVersion()) - .setMaxVersion(apiVersion.maxVersion()) - ); + apiVersions.add(apiVersion.duplicate()); } } diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index c8a4b3d400c35..79a2e7025fd97 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.utils.LogContext import org.apache.kafka.common.utils.{KafkaThread, Time} import org.apache.kafka.common.Node -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse} import scala.jdk.CollectionConverters._ @@ -159,7 +159,7 @@ object BrokerApiVersionsCommand { throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers") } - private def getApiVersions(node: Node): ApiVersionsResponseKeyCollection = { + private def getApiVersions(node: Node): ApiVersionCollection = { val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse] Errors.forCode(response.data.errorCode).maybeThrow() response.data.apiKeys diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 41f33225491f3..af57e61f83f3a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1763,7 +1763,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if (request.context.fromPrivilegedListener) { apiVersionsResponse.data.apiKeys().add( - new ApiVersionsResponseData.ApiVersionsResponseKey() + new ApiVersionsResponseData.ApiVersion() .setApiKey(ApiKeys.ENVELOPE.id) .setMinVersion(ApiKeys.ENVELOPE.oldestVersion()) .setMaxVersion(ApiKeys.ENVELOPE.latestVersion()) diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala index 7cff35f2b7b31..a92865cbc4ca2 100644 --- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala +++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala @@ -19,12 +19,11 @@ package kafka.raft import java.net.InetSocketAddress import java.util import java.util.Collections - import org.apache.kafka.clients.MockClient.MockMetadataUpdater import org.apache.kafka.clients.{MockClient, NodeApiVersions} import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, VoteResponseData} -import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, ApiVersion, Errors} -import org.apache.kafka.common.requests.{AbstractResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, EndQuorumEpochRequest, EndQuorumEpochResponse, FetchResponse, VoteRequest, VoteResponse} +import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} +import org.apache.kafka.common.requests.{AbstractResponse, ApiVersionsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, EndQuorumEpochRequest, EndQuorumEpochResponse, FetchResponse, VoteRequest, VoteResponse} import org.apache.kafka.common.utils.{MockTime, Time} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.raft.{RaftRequest, RaftUtil} @@ -45,7 +44,7 @@ class KafkaNetworkChannelTest { @BeforeEach def setupSupportedApis(): Unit = { - val supportedApis = RaftApis.map(api => new ApiVersion(api)) + val supportedApis = RaftApis.map(ApiVersionsResponse.toApiVersion) client.setNodeApiVersions(NodeApiVersions.create(supportedApis.asJava)) } diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index bed566eb7600a..fd599c6e4ea5b 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -17,8 +17,7 @@ package kafka.server import java.util.Properties - -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} @@ -54,7 +53,7 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest { } assertEquals(expectedApis.size(), apiVersionsResponse.data.apiKeys().size(), "API keys in ApiVersionsResponse must match API keys supported by broker.") - for (expectedApiVersion: ApiVersionsResponseKey <- ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.apiKeys().asScala) { + for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.apiKeys().asScala) { val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey) assertNotNull(actualApiVersion, s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.") assertEquals(expectedApiVersion.apiKey, actualApiVersion.apiKey, "API key must be supported by the broker.") diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 227e21440fb35..ae8aeea0994b2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -616,7 +616,7 @@ class KafkaApisTest { createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handleApiVersionsRequest(request) - val expectedVersions = new ApiVersionsResponseData.ApiVersionsResponseKey() + val expectedVersions = new ApiVersionsResponseData.ApiVersion() .setApiKey(ApiKeys.ALTER_CONFIGS.id) .setMaxVersion(permittedVersion) .setMinVersion(permittedVersion) @@ -655,10 +655,7 @@ class KafkaApisTest { .asInstanceOf[ApiVersionsResponse] assertEquals(Errors.NONE, Errors.forCode(response.data().errorCode())) - val expectedVersions = new ApiVersionsResponseData.ApiVersionsResponseKey() - .setApiKey(ApiKeys.ALTER_CONFIGS.id) - .setMaxVersion(ApiKeys.ALTER_CONFIGS.latestVersion()) - .setMinVersion(ApiKeys.ALTER_CONFIGS.oldestVersion()) + val expectedVersions = ApiVersionsResponse.toApiVersion(ApiKeys.ALTER_CONFIGS) val alterConfigVersions = response.data().apiKeys().find(ApiKeys.ALTER_CONFIGS.id) assertEquals(expectedVersions, alterConfigVersions)