From bfdac177419aebafaa43cd871bd37010829d1b11 Mon Sep 17 00:00:00 2001 From: dengziming Date: Sun, 13 Dec 2020 17:42:20 +0800 Subject: [PATCH] remove ApiKeys#parseResponse and ApiKeys#parseRequest --- .../apache/kafka/common/protocol/ApiKeys.java | 48 +------------------ .../kafka/common/protocol/ApiKeysTest.java | 35 -------------- .../requests/ApiVersionsResponseTest.java | 28 +++++++++++ 3 files changed, 29 insertions(+), 82 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index c48e6e100364a..9975868c9dfab 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -136,12 +136,9 @@ import org.apache.kafka.common.message.WriteTxnMarkersRequestData; import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.SchemaException; -import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.RecordBatch; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -178,15 +175,7 @@ public enum ApiKeys { DescribeGroupsResponseData.SCHEMAS), LIST_GROUPS(16, "ListGroups", ListGroupsRequestData.SCHEMAS, ListGroupsResponseData.SCHEMAS), SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS), - API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS) { - @Override - public Struct parseResponse(short version, ByteBuffer buffer) { - // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest - // using a version higher than that supported by the broker, a version 0 response is sent - // to the client indicating UNSUPPORTED_VERSION. - return parseResponse(version, buffer, (short) 0); - } - }, + API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS), CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS, true), DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS, true), DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequestData.SCHEMAS, DeleteRecordsResponseData.SCHEMAS), @@ -379,41 +368,6 @@ public short oldestVersion() { return 0; } - public Schema requestSchema(short version) { - return schemaFor(requestSchemas, version); - } - - public Schema responseSchema(short version) { - return schemaFor(responseSchemas, version); - } - - public Struct parseRequest(short version, ByteBuffer buffer) { - return requestSchema(version).read(buffer); - } - - public Struct parseResponse(short version, ByteBuffer buffer) { - return responseSchema(version).read(buffer); - } - - protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) { - int bufferPosition = buffer.position(); - try { - return responseSchema(version).read(buffer); - } catch (SchemaException e) { - if (version != fallbackVersion) { - buffer.position(bufferPosition); - return responseSchema(fallbackVersion).read(buffer); - } else - throw e; - } - } - - private Schema schemaFor(Schema[] versions, short version) { - if (!isVersionSupported(version)) - throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version); - return versions[version]; - } - public boolean isVersionSupported(short apiVersion) { return apiVersion >= oldestVersion() && apiVersion <= latestVersion(); } diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 78b59a5b542d4..3845aa8aeaf2e 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -16,16 +16,8 @@ */ package org.apache.kafka.common.protocol; -import org.apache.kafka.common.protocol.types.BoundField; -import org.apache.kafka.common.protocol.types.Schema; import org.junit.Test; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - public class ApiKeysTest { @Test(expected = IllegalArgumentException.class) @@ -38,31 +30,4 @@ public void testForIdWithInvalidIdHigh() { ApiKeys.forId(10000); } - @Test(expected = IllegalArgumentException.class) - public void schemaVersionOutOfRange() { - ApiKeys.PRODUCE.requestSchema((short) ApiKeys.PRODUCE.requestSchemas.length); - } - - /** - * All valid client responses which may be throttled should have a field named - * 'throttle_time_ms' to return the throttle time to the client. Exclusions are - * - */ - @Test - public void testResponseThrottleTime() { - List authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE); - for (ApiKeys apiKey: ApiKeys.values()) { - Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion()); - BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name); - if (apiKey.clusterAction || authenticationKeys.contains(apiKey)) - assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField); - else - assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField); - } - } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 98b0c149ce11b..7f39b18908c97 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -19,10 +19,15 @@ import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.CommonFields; +import org.apache.kafka.common.protocol.types.BoundField; +import org.apache.kafka.common.protocol.types.Schema; import org.junit.Test; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -69,6 +74,29 @@ public void shouldHaveCorrectDefaultApiVersionsResponse() { assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.finalizedFeaturesEpoch()); } + /** + * All valid client responses which may be throttled should have a field named + * 'throttle_time_ms' to return the throttle time to the client. Exclusions are + * + */ + @Test + public void testResponseThrottleTime() { + List authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE); + for (ApiKeys apiKey: ApiKeys.values()) { + Schema responseSchema = apiKey.responseSchemas[apiKey.latestVersion()]; + BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name); + if (apiKey.clusterAction || authenticationKeys.contains(apiKey)) + assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField); + else + assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField); + } + } + private Set apiKeysInResponse(final ApiVersionsResponse apiVersions) { final Set apiKeys = new HashSet<>();