Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkSend;
Expand Down Expand Up @@ -905,7 +905,7 @@ private void handleApiVersionsResponse(List<ClientResponse> responses,
// If not provided, the client falls back to version 0.
short maxApiVersion = 0;
if (apiVersionsResponse.data().apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
ApiVersion apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersion != null) {
maxApiVersion = apiVersion.maxVersion();
}
Expand Down
64 changes: 34 additions & 30 deletions clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import java.util.Collections;
import java.util.LinkedList;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.Utils;

import java.util.ArrayList;
Expand All @@ -50,7 +50,7 @@ public class NodeApiVersions {
* @return A new NodeApiVersions object.
*/
public static NodeApiVersions create() {
return create(Collections.<ApiVersion>emptyList());
return create(Collections.emptyList());
}

/**
Expand All @@ -65,14 +65,12 @@ public static NodeApiVersions create(Collection<ApiVersion> overrides) {
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey == apiKey.id) {
if (apiVersion.apiKey() == apiKey.id) {
exists = true;
break;
}
}
if (!exists) {
apiVersions.add(new ApiVersion(apiKey));
}
if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
}
return new NodeApiVersions(apiVersions);
}
Expand All @@ -87,25 +85,28 @@ public static NodeApiVersions create(Collection<ApiVersion> overrides) {
* @return A new NodeApiVersions object.
*/
public static NodeApiVersions create(short apiKey, short minVersion, short maxVersion) {
return create(Collections.singleton(new ApiVersion(apiKey, minVersion, maxVersion)));
return create(Collections.singleton(new ApiVersion()
.setApiKey(apiKey)
.setMinVersion(minVersion)
.setMaxVersion(maxVersion)));
}

public NodeApiVersions(ApiVersionsResponseKeyCollection nodeApiVersions) {
for (ApiVersionsResponseKey nodeApiVersion : nodeApiVersions) {
public NodeApiVersions(ApiVersionCollection nodeApiVersions) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
supportedVersions.put(nodeApiKey, new ApiVersion(nodeApiVersion));
supportedVersions.put(nodeApiKey, nodeApiVersion);
} else {
// Newer brokers may support ApiKeys we don't know about
unknownApis.add(new ApiVersion(nodeApiVersion));
unknownApis.add(nodeApiVersion);
}
}
}

public NodeApiVersions(Collection<ApiVersion> nodeApiVersions) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey)) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey);
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
supportedVersions.put(nodeApiKey, nodeApiVersion);
} else {
// Newer brokers may support ApiKeys we don't know about
Expand All @@ -128,15 +129,18 @@ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, sho
if (!supportedVersions.containsKey(apiKey))
throw new UnsupportedVersionException("The broker does not support " + apiKey);
ApiVersion supportedVersion = supportedVersions.get(apiKey);
Optional<ApiVersion> intersectVersion = supportedVersion.intersect(
new ApiVersion(apiKey.id, oldestAllowedVersion, latestAllowedVersion));
Optional<ApiVersion> intersectVersion = ApiVersionsResponse.intersect(supportedVersion,
new ApiVersion()
.setApiKey(apiKey.id)
.setMinVersion(oldestAllowedVersion)
.setMaxVersion(latestAllowedVersion));

if (intersectVersion.isPresent())
return intersectVersion.get().maxVersion;
return intersectVersion.get().maxVersion();
else
throw new UnsupportedVersionException("The broker does not support " + apiKey +
" with version in range [" + oldestAllowedVersion + "," + latestAllowedVersion + "]. The supported" +
" range is [" + supportedVersion.minVersion + "," + supportedVersion.maxVersion + "].");
" range is [" + supportedVersion.minVersion() + "," + supportedVersion.maxVersion() + "].");
}

/**
Expand All @@ -160,9 +164,9 @@ public String toString(boolean lineBreaks) {
// ascending order.
TreeMap<Short, String> apiKeysText = new TreeMap<>();
for (ApiVersion supportedVersion : this.supportedVersions.values())
apiKeysText.put(supportedVersion.apiKey, apiVersionToText(supportedVersion));
apiKeysText.put(supportedVersion.apiKey(), apiVersionToText(supportedVersion));
for (ApiVersion apiVersion : unknownApis)
apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion));
apiKeysText.put(apiVersion.apiKey(), apiVersionToText(apiVersion));

// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
// which may happen when the remote is too old.
Expand All @@ -189,27 +193,27 @@ public String toString(boolean lineBreaks) {
private String apiVersionToText(ApiVersion apiVersion) {
StringBuilder bld = new StringBuilder();
ApiKeys apiKey = null;
if (ApiKeys.hasId(apiVersion.apiKey)) {
apiKey = ApiKeys.forId(apiVersion.apiKey);
if (ApiKeys.hasId(apiVersion.apiKey())) {
apiKey = ApiKeys.forId(apiVersion.apiKey());
bld.append(apiKey.name).append("(").append(apiKey.id).append("): ");
} else {
bld.append("UNKNOWN(").append(apiVersion.apiKey).append("): ");
bld.append("UNKNOWN(").append(apiVersion.apiKey()).append("): ");
}

if (apiVersion.minVersion == apiVersion.maxVersion) {
bld.append(apiVersion.minVersion);
if (apiVersion.minVersion() == apiVersion.maxVersion()) {
bld.append(apiVersion.minVersion());
} else {
bld.append(apiVersion.minVersion).append(" to ").append(apiVersion.maxVersion);
bld.append(apiVersion.minVersion()).append(" to ").append(apiVersion.maxVersion());
}

if (apiKey != null) {
ApiVersion supportedVersion = supportedVersions.get(apiKey);
if (apiKey.latestVersion() < supportedVersion.minVersion) {
if (apiKey.latestVersion() < supportedVersion.minVersion()) {
bld.append(" [unusable: node too new]");
} else if (supportedVersion.maxVersion < apiKey.oldestVersion()) {
} else if (supportedVersion.maxVersion() < apiKey.oldestVersion()) {
bld.append(" [unusable: node too old]");
} else {
short latestUsableVersion = Utils.min(apiKey.latestVersion(), supportedVersion.maxVersion);
short latestUsableVersion = Utils.min(apiKey.latestVersion(), supportedVersion.maxVersion());
bld.append(" [usable: ").append(latestUsableVersion).append("]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
Expand Down Expand Up @@ -48,6 +47,7 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
Expand Down Expand Up @@ -774,7 +774,7 @@ static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersi
if (apiVersion == null)
return false;

return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion);
return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NodeApiVersions;
Expand All @@ -28,6 +27,7 @@
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -1051,7 +1051,7 @@ void handleCoordinatorReady() {
nodeApiVersions.apiVersion(ApiKeys.INIT_PRODUCER_ID) :
null;
this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
initProducerIdVersion.maxVersion >= 3;
initProducerIdVersion.maxVersion() >= 3;
}

private void transitionTo(State target) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
import java.util.regex.Pattern;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -108,11 +107,8 @@ public ApiVersionsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
// Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported
// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
if (Errors.forException(e) == Errors.UNSUPPORTED_VERSION) {
ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection();
apiKeys.add(new ApiVersionsResponseKey()
.setApiKey(ApiKeys.API_VERSIONS.id)
.setMinVersion(ApiKeys.API_VERSIONS.oldestVersion())
.setMaxVersion(ApiKeys.API_VERSIONS.latestVersion()));
ApiVersionCollection apiKeys = new ApiVersionCollection();
apiKeys.add(ApiVersionsResponse.toApiVersion(ApiKeys.API_VERSIONS));
data.setApiKeys(apiKeys);
}

Expand Down
Loading