Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,9 @@
import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -178,15 +175,7 @@ public enum ApiKeys {
DescribeGroupsResponseData.SCHEMAS),
LIST_GROUPS(16, "ListGroups", ListGroupsRequestData.SCHEMAS, ListGroupsResponseData.SCHEMAS),
SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS) {
@Override
public Struct parseResponse(short version, ByteBuffer buffer) {
// Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
// using a version higher than that supported by the broker, a version 0 response is sent
// to the client indicating UNSUPPORTED_VERSION.
return parseResponse(version, buffer, (short) 0);
Comment on lines 184 to 187
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dengziming Do we maintain this fallback mechanism somewhere?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
},
API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS),
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS, true),
DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS, true),
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequestData.SCHEMAS, DeleteRecordsResponseData.SCHEMAS),
Expand Down Expand Up @@ -379,41 +368,6 @@ public short oldestVersion() {
return 0;
}

public Schema requestSchema(short version) {
return schemaFor(requestSchemas, version);
}

public Schema responseSchema(short version) {
return schemaFor(responseSchemas, version);
}

public Struct parseRequest(short version, ByteBuffer buffer) {
return requestSchema(version).read(buffer);
}

public Struct parseResponse(short version, ByteBuffer buffer) {
return responseSchema(version).read(buffer);
}

protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
int bufferPosition = buffer.position();
try {
return responseSchema(version).read(buffer);
} catch (SchemaException e) {
if (version != fallbackVersion) {
buffer.position(bufferPosition);
return responseSchema(fallbackVersion).read(buffer);
} else
throw e;
}
}

private Schema schemaFor(Schema[] versions, short version) {
if (!isVersionSupported(version))
throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
return versions[version];
}

public boolean isVersionSupported(short apiVersion) {
return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,8 @@
*/
package org.apache.kafka.common.protocol;

import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

public class ApiKeysTest {

@Test(expected = IllegalArgumentException.class)
Expand All @@ -38,31 +30,4 @@ public void testForIdWithInvalidIdHigh() {
ApiKeys.forId(10000);
}

@Test(expected = IllegalArgumentException.class)
public void schemaVersionOutOfRange() {
ApiKeys.PRODUCE.requestSchema((short) ApiKeys.PRODUCE.requestSchemas.length);
}

/**
* All valid client responses which may be throttled should have a field named
* 'throttle_time_ms' to return the throttle time to the client. Exclusions are
* <ul>
* <li> Cluster actions used only for inter-broker are throttled only if unauthorized
* <li> SASL_HANDSHAKE and SASL_AUTHENTICATE are not throttled when used for authentication
* when a connection is established or for re-authentication thereafter; these requests
* return an error response that may be throttled if they are sent otherwise.
* </ul>
*/
@Test
public void testResponseThrottleTime() {
List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
for (ApiKeys apiKey: ApiKeys.values()) {
Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name);
if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField);
else
assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -69,6 +74,29 @@ public void shouldHaveCorrectDefaultApiVersionsResponse() {
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.finalizedFeaturesEpoch());
}

/**
* All valid client responses which may be throttled should have a field named
* 'throttle_time_ms' to return the throttle time to the client. Exclusions are
* <ul>
* <li> Cluster actions used only for inter-broker are throttled only if unauthorized
* <li> SASL_HANDSHAKE and SASL_AUTHENTICATE are not throttled when used for authentication
* when a connection is established or for re-authentication thereafter; these requests
* return an error response that may be throttled if they are sent otherwise.
* </ul>
*/
@Test
public void testResponseThrottleTime() {
List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
for (ApiKeys apiKey: ApiKeys.values()) {
Schema responseSchema = apiKey.responseSchemas[apiKey.latestVersion()];
BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name);
if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField);
else
assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField);
}
}


private Set<ApiKeys> apiKeysInResponse(final ApiVersionsResponse apiVersions) {
final Set<ApiKeys> apiKeys = new HashSet<>();
Expand Down