diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index c48e6e100364a..9975868c9dfab 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -136,12 +136,9 @@ import org.apache.kafka.common.message.WriteTxnMarkersRequestData; import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.SchemaException; -import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.RecordBatch; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -178,15 +175,7 @@ public enum ApiKeys { DescribeGroupsResponseData.SCHEMAS), LIST_GROUPS(16, "ListGroups", ListGroupsRequestData.SCHEMAS, ListGroupsResponseData.SCHEMAS), SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS), - API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS) { - @Override - public Struct parseResponse(short version, ByteBuffer buffer) { - // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest - // using a version higher than that supported by the broker, a version 0 response is sent - // to the client indicating UNSUPPORTED_VERSION. - return parseResponse(version, buffer, (short) 0); - } - }, + API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS), CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS, true), DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS, true), DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequestData.SCHEMAS, DeleteRecordsResponseData.SCHEMAS), @@ -379,41 +368,6 @@ public short oldestVersion() { return 0; } - public Schema requestSchema(short version) { - return schemaFor(requestSchemas, version); - } - - public Schema responseSchema(short version) { - return schemaFor(responseSchemas, version); - } - - public Struct parseRequest(short version, ByteBuffer buffer) { - return requestSchema(version).read(buffer); - } - - public Struct parseResponse(short version, ByteBuffer buffer) { - return responseSchema(version).read(buffer); - } - - protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) { - int bufferPosition = buffer.position(); - try { - return responseSchema(version).read(buffer); - } catch (SchemaException e) { - if (version != fallbackVersion) { - buffer.position(bufferPosition); - return responseSchema(fallbackVersion).read(buffer); - } else - throw e; - } - } - - private Schema schemaFor(Schema[] versions, short version) { - if (!isVersionSupported(version)) - throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version); - return versions[version]; - } - public boolean isVersionSupported(short apiVersion) { return apiVersion >= oldestVersion() && apiVersion <= latestVersion(); } diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 78b59a5b542d4..3845aa8aeaf2e 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -16,16 +16,8 @@ */ package org.apache.kafka.common.protocol; -import org.apache.kafka.common.protocol.types.BoundField; -import org.apache.kafka.common.protocol.types.Schema; import org.junit.Test; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - public class ApiKeysTest { @Test(expected = IllegalArgumentException.class) @@ -38,31 +30,4 @@ public void testForIdWithInvalidIdHigh() { ApiKeys.forId(10000); } - @Test(expected = IllegalArgumentException.class) - public void schemaVersionOutOfRange() { - ApiKeys.PRODUCE.requestSchema((short) ApiKeys.PRODUCE.requestSchemas.length); - } - - /** - * All valid client responses which may be throttled should have a field named - * 'throttle_time_ms' to return the throttle time to the client. Exclusions are - *