diff --git a/build.gradle b/build.gradle
index 975d56b4881ed..e372ad1d48152 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1553,7 +1553,7 @@ project(':jmh-benchmarks') {
doFirst {
if (System.getProperty("jmhArgs")) {
- args System.getProperty("jmhArgs").split(',')
+ args System.getProperty("jmhArgs").split(' ')
}
args = [shadowJar.archivePath, *args]
}
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 5ada90af26329..d2c60e9e0f1e7 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -36,6 +36,10 @@
+
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 4fee5636f522f..0e57e78711fa0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -478,6 +478,27 @@ public static MetadataResponse prepareResponse(Collection brokers, String
return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
}
+ public static MetadataResponse prepareResponse(int throttleTimeMs, List topicMetadataList,
+ Collection brokers, String clusterId, int controllerId,
+ int clusterAuthorizedOperations) {
+ MetadataResponseData responseData = new MetadataResponseData();
+ responseData.setThrottleTimeMs(throttleTimeMs);
+ brokers.forEach(broker ->
+ responseData.brokers().add(new MetadataResponseBroker()
+ .setNodeId(broker.id())
+ .setHost(broker.host())
+ .setPort(broker.port())
+ .setRack(broker.rack()))
+ );
+
+ responseData.setClusterId(clusterId);
+ responseData.setControllerId(controllerId);
+ responseData.setClusterAuthorizedOperations(clusterAuthorizedOperations);
+
+ topicMetadataList.forEach(topicMetadata -> responseData.topics().add(topicMetadata));
+ return new MetadataResponse(responseData);
+ }
+
@Override
public boolean shouldClientThrottle(short version) {
return version >= 6;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 28b46401e0b96..9731476a3626b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -82,6 +82,8 @@
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
@@ -112,8 +114,6 @@
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
-import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -342,28 +342,33 @@ private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors err
}
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
- List metadata = new ArrayList<>();
+ List metadata = new ArrayList<>();
for (String topic : cluster.topics()) {
- List pms = new ArrayList<>();
+ List pms = new ArrayList<>();
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
- PartitionMetadata pm = new PartitionMetadata(error,
- new TopicPartition(topic, pInfo.partition()),
- Optional.of(pInfo.leader().id()),
- Optional.of(234),
- Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()),
- Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()),
- Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
+ MetadataResponsePartition pm = new MetadataResponsePartition()
+ .setErrorCode(error.code())
+ .setPartitionIndex(pInfo.partition())
+ .setLeaderId(pInfo.leader().id())
+ .setLeaderEpoch(234)
+ .setReplicaNodes(Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()))
+ .setIsrNodes(Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()))
+ .setOfflineReplicas(Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
pms.add(pm);
}
- TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
+ MetadataResponseTopic tm = new MetadataResponseTopic()
+ .setErrorCode(error.code())
+ .setName(topic)
+ .setIsInternal(false)
+ .setPartitions(pms);
metadata.add(tm);
}
return MetadataResponse.prepareResponse(0,
- cluster.nodes(),
- cluster.clusterResource().clusterId(),
- cluster.controller().id(),
- metadata,
- MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+ metadata,
+ cluster.nodes(),
+ cluster.clusterResource().clusterId(),
+ cluster.controller().id(),
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
}
/**
@@ -1107,19 +1112,19 @@ public void testDescribeCluster() throws Exception {
// Prepare the metadata response used for the first describe cluster
MetadataResponse response = MetadataResponse.prepareResponse(0,
+ Collections.emptyList(),
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
- Collections.emptyList(),
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
env.kafkaClient().prepareResponse(response);
// Prepare the metadata response used for the second describe cluster
MetadataResponse response2 = MetadataResponse.prepareResponse(0,
+ Collections.emptyList(),
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
3,
- Collections.emptyList(),
1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code());
env.kafkaClient().prepareResponse(response2);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8a14cd79c9f1b..2f275024e7983 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -76,6 +76,8 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.message.MetadataResponseData
+import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
import org.apache.kafka.server.authorizer._
import scala.compat.java8.OptionConverters._
@@ -1000,24 +1002,30 @@ class KafkaApis(val requestChannel: RequestChannel,
private def createTopic(topic: String,
numPartitions: Int,
replicationFactor: Int,
- properties: util.Properties = new util.Properties()): MetadataResponse.TopicMetadata = {
+ properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
try {
adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
- new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
- util.Collections.emptyList())
+ metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
} catch {
case _: TopicExistsException => // let it go, possibly another broker created this topic
- new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
- util.Collections.emptyList())
+ metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
case ex: Throwable => // Catch all to prevent unhandled errors
- new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
- util.Collections.emptyList())
+ metadataResponseTopic(Errors.forException(ex), topic, isInternal(topic), util.Collections.emptyList())
}
}
- private def createInternalTopic(topic: String): MetadataResponse.TopicMetadata = {
+ private def metadataResponseTopic(error: Errors, topic: String, isInternal: Boolean,
+ partitionData: util.List[MetadataResponsePartition]): MetadataResponseTopic = {
+ new MetadataResponseTopic()
+ .setErrorCode(error.code)
+ .setName(topic)
+ .setIsInternal(isInternal)
+ .setPartitions(partitionData)
+ }
+
+ private def createInternalTopic(topic: String): MetadataResponseTopic = {
if (topic == null)
throw new IllegalArgumentException("topic must not be null")
@@ -1030,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
s"and not all brokers are up yet.")
- new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
+ metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
} else {
createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
groupCoordinator.offsetsTopicConfigs)
@@ -1041,7 +1049,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
s"and not all brokers are up yet.")
- new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
+ metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
} else {
createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
txnCoordinator.transactionTopicConfigs)
@@ -1050,31 +1058,31 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponse.TopicMetadata = {
+ private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
topicMetadata.headOption.getOrElse(createInternalTopic(topic))
}
private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
- errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = {
+ errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
- val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
+ val nonExistentTopics = topics -- topicResponses.map(_.name).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
if (isInternal(topic)) {
val topicMetadata = createInternalTopic(topic)
- if (topicMetadata.error == Errors.COORDINATOR_NOT_AVAILABLE)
- new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
+ if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
+ metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
else
topicMetadata
} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
- new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList())
+ metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList())
}
}
topicResponses ++ responsesForNonExistentTopics
@@ -1109,17 +1117,17 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
- new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic),
- util.Collections.emptyList()))
+ metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic), util.Collections.emptyList()))
+
// do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
val unauthorizedForDescribeTopicMetadata =
// In case of all topics, don't include topics unauthorized for Describe
if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
- Set.empty[MetadataResponse.TopicMetadata]
+ Set.empty[MetadataResponseTopic]
else
unauthorizedForDescribeTopics.map(topic =>
- new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList()))
+ metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList()))
// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
@@ -1129,7 +1137,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val errorUnavailableListeners = requestVersion >= 6
val topicMetadata =
if (authorizedTopics.isEmpty)
- Seq.empty[MetadataResponse.TopicMetadata]
+ Seq.empty[MetadataResponseTopic]
else
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
@@ -1147,7 +1155,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// get topic authorized operations
if (metadataRequest.data.includeTopicAuthorizedOperations) {
topicMetadata.foreach { topicData =>
- topicData.authorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.topic)))
+ topicData.setTopicAuthorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.name)))
}
}
}
@@ -1162,10 +1170,10 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, requestThrottleMs =>
MetadataResponse.prepareResponse(
requestThrottleMs,
+ completeTopicMetadata.asJava,
brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
- completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
}
@@ -1285,13 +1293,13 @@ class KafkaApis(val requestChannel: RequestChannel,
.setPort(node.port)
.setThrottleTimeMs(requestThrottleMs))
}
- val responseBody = if (topicMetadata.error != Errors.NONE) {
+ val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
} else {
- val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
- .find(_.partition == partition)
- .filter(_.leaderId.isPresent)
- .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId.get))
+ val coordinatorEndpoint = topicMetadata.partitions.asScala
+ .find(_.partitionIndex == partition)
+ .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+ .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
.flatMap(_.getNode(request.context.listenerName))
.filterNot(_.isEmpty)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 10c25b594984a..ee06dcd8cfa32 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -18,10 +18,10 @@
package kafka.server
import java.util
-import java.util.{Collections, Optional}
+import java.util.{Collections}
import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.collection.{Seq, Set, mutable}
+import scala.collection.{mutable, Seq, Set}
import scala.collection.JavaConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
@@ -31,6 +31,8 @@ import kafka.utils.Logging
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition}
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
@@ -78,7 +80,7 @@ class MetadataCache(brokerId: Int) extends Logging {
// If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
// Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
private def getPartitionMetadata(snapshot: MetadataSnapshot, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
- errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+ errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponsePartition]] = {
snapshot.partitionStates.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
val topicPartition = new TopicPartition(topic, partitionId.toInt)
@@ -104,24 +106,37 @@ class MetadataCache(brokerId: Int) extends Logging {
s"not found on leader $leaderBrokerId")
if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
}
- new MetadataResponse.PartitionMetadata(error, topicPartition, Optional.empty(),
- Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
+
+ new MetadataResponsePartition()
+ .setErrorCode(error.code)
+ .setPartitionIndex(partitionId.toInt)
+ .setLeaderId(MetadataResponse.NO_LEADER_ID)
+ .setLeaderEpoch(leaderEpoch)
+ .setReplicaNodes(filteredReplicas)
+ .setIsrNodes(filteredIsr)
+ .setOfflineReplicas(offlineReplicas)
case Some(leader) =>
- if (filteredReplicas.size < replicas.size) {
+ val error = if (filteredReplicas.size < replicas.size) {
debug(s"Error while fetching metadata for $topicPartition: replica information not available for " +
s"following brokers ${replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
- new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
- Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
+ Errors.REPLICA_NOT_AVAILABLE
} else if (filteredIsr.size < isr.size) {
debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
s"following brokers ${isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
- new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, topicPartition,
- Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
+ Errors.REPLICA_NOT_AVAILABLE
} else {
- new MetadataResponse.PartitionMetadata(Errors.NONE, topicPartition,
- Optional.of(leader.id), Optional.of(leaderEpoch), filteredReplicas, filteredIsr, offlineReplicas)
+ Errors.NONE
}
+
+ new MetadataResponsePartition()
+ .setErrorCode(error.code)
+ .setPartitionIndex(partitionId.toInt)
+ .setLeaderId(maybeLeader.map(_.id()).getOrElse(MetadataResponse.NO_LEADER_ID))
+ .setLeaderEpoch(leaderEpoch)
+ .setReplicaNodes(filteredReplicas)
+ .setIsrNodes(filteredIsr)
+ .setOfflineReplicas(offlineReplicas)
}
}
}
@@ -150,11 +165,15 @@ class MetadataCache(brokerId: Int) extends Logging {
def getTopicMetadata(topics: Set[String],
listenerName: ListenerName,
errorUnavailableEndpoints: Boolean = false,
- errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
+ errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
val snapshot = metadataSnapshot
topics.toSeq.flatMap { topic =>
getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
- new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
+ new MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code)
+ .setName(topic)
+ .setIsInternal(Topic.isInternal(topic))
+ .setPartitions(partitionMetadata.toBuffer.asJava)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 5281d80d4ffad..cdeb49a120846 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -580,11 +580,11 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
(result, server) => {
val topicMetadatas = server.dataPlaneRequestProcessor.metadataCache
.getTopicMetadata(Set(testTopicName), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
- val testPartitionMetadata = topicMetadatas.find(_.topic().equals(testTopicName)).get.partitionMetadata().asScala.find(_.partition() == partitionOnBroker0)
+ val testPartitionMetadata = topicMetadatas.find(_.name.equals(testTopicName)).get.partitions.asScala.find(_.partitionIndex == partitionOnBroker0)
testPartitionMetadata match {
case None => fail(s"Partition metadata is not found in metadata cache")
case Some(metadata) => {
- result && metadata.error == Errors.LEADER_NOT_AVAILABLE
+ result && metadata.errorCode == Errors.LEADER_NOT_AVAILABLE.code
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 60cff13898bf8..ce804e81ed244 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -17,12 +17,12 @@
package kafka.server
import java.util
-import java.util.Optional
import util.Arrays.asList
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.UpdateMetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Test
@@ -118,22 +118,22 @@ class MetadataCacheTest {
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
- assertEquals(Errors.NONE, topicMetadata.error)
- assertEquals(topic, topicMetadata.topic)
+ assertEquals(Errors.NONE.code, topicMetadata.errorCode)
+ assertEquals(topic, topicMetadata.name)
val topicPartitionStates = partitionStates.filter { ps => ps.topicName == topic }
- val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
+ val partitionMetadatas = topicMetadata.partitions.asScala.sortBy(_.partitionIndex)
assertEquals(s"Unexpected partition count for topic $topic", topicPartitionStates.size, partitionMetadatas.size)
partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, partitionId) =>
- assertEquals(Errors.NONE, partitionMetadata.error)
- assertEquals(partitionId, partitionMetadata.partition)
+ assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
+ assertEquals(partitionId, partitionMetadata.partitionIndex)
val partitionState = topicPartitionStates.find(_.partitionIndex == partitionId).getOrElse(
Assertions.fail(s"Unable to find partition state for partition $partitionId"))
- assertEquals(Optional.of(partitionState.leader), partitionMetadata.leaderId)
- assertEquals(Optional.of(partitionState.leaderEpoch), partitionMetadata.leaderEpoch)
- assertEquals(partitionState.isr, partitionMetadata.inSyncReplicaIds)
- assertEquals(partitionState.replicas, partitionMetadata.replicaIds)
+ assertEquals(partitionState.leader, partitionMetadata.leaderId)
+ assertEquals(partitionState.leaderEpoch, partitionMetadata.leaderEpoch)
+ assertEquals(partitionState.isr, partitionMetadata.isrNodes)
+ assertEquals(partitionState.replicas, partitionMetadata.replicaNodes)
}
}
@@ -255,16 +255,16 @@ class MetadataCacheTest {
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
- assertEquals(Errors.NONE, topicMetadata.error)
+ assertEquals(Errors.NONE.code, topicMetadata.errorCode)
- val partitionMetadatas = topicMetadata.partitionMetadata
+ val partitionMetadatas = topicMetadata.partitions
assertEquals(1, partitionMetadatas.size)
val partitionMetadata = partitionMetadatas.get(0)
- assertEquals(0, partitionMetadata.partition)
- assertEquals(expectedError, partitionMetadata.error)
- assertFalse(partitionMetadata.inSyncReplicaIds.isEmpty)
- assertEquals(List(0), partitionMetadata.replicaIds.asScala)
+ assertEquals(0, partitionMetadata.partitionIndex)
+ assertEquals(expectedError.code, partitionMetadata.errorCode)
+ assertFalse(partitionMetadata.isrNodes.isEmpty)
+ assertEquals(List(0), partitionMetadata.replicaNodes.asScala)
}
@Test
@@ -313,32 +313,32 @@ class MetadataCacheTest {
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
- assertEquals(Errors.NONE, topicMetadata.error)
+ assertEquals(Errors.NONE.code(), topicMetadata.errorCode)
- val partitionMetadatas = topicMetadata.partitionMetadata
+ val partitionMetadatas = topicMetadata.partitions
assertEquals(1, partitionMetadatas.size)
val partitionMetadata = partitionMetadatas.get(0)
- assertEquals(0, partitionMetadata.partition)
- assertEquals(Errors.NONE, partitionMetadata.error)
- assertEquals(Set(0, 1), partitionMetadata.replicaIds.asScala.toSet)
- assertEquals(Set(0), partitionMetadata.inSyncReplicaIds.asScala.toSet)
+ assertEquals(0, partitionMetadata.partitionIndex)
+ assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
+ assertEquals(Set(0, 1), partitionMetadata.replicaNodes.asScala.toSet)
+ assertEquals(Set(0), partitionMetadata.isrNodes.asScala.toSet)
// Validate errorUnavailableEndpoints = true
val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true)
assertEquals(1, topicMetadatasWithError.size)
val topicMetadataWithError = topicMetadatasWithError.head
- assertEquals(Errors.NONE, topicMetadataWithError.error)
+ assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode)
- val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata
+ val partitionMetadatasWithError = topicMetadataWithError.partitions()
assertEquals(1, partitionMetadatasWithError.size)
val partitionMetadataWithError = partitionMetadatasWithError.get(0)
- assertEquals(0, partitionMetadataWithError.partition)
- assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
- assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
- assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
+ assertEquals(0, partitionMetadataWithError.partitionIndex)
+ assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, partitionMetadataWithError.errorCode)
+ assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet)
+ assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet)
}
@Test
@@ -387,32 +387,32 @@ class MetadataCacheTest {
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
- assertEquals(Errors.NONE, topicMetadata.error)
+ assertEquals(Errors.NONE.code(), topicMetadata.errorCode)
- val partitionMetadatas = topicMetadata.partitionMetadata
+ val partitionMetadatas = topicMetadata.partitions
assertEquals(1, partitionMetadatas.size)
val partitionMetadata = partitionMetadatas.get(0)
- assertEquals(0, partitionMetadata.partition)
- assertEquals(Errors.NONE, partitionMetadata.error)
- assertEquals(Set(0), partitionMetadata.replicaIds.asScala.toSet)
- assertEquals(Set(0, 1), partitionMetadata.inSyncReplicaIds.asScala.toSet)
+ assertEquals(0, partitionMetadata.partitionIndex)
+ assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
+ assertEquals(Set(0), partitionMetadata.replicaNodes.asScala.toSet)
+ assertEquals(Set(0, 1), partitionMetadata.isrNodes.asScala.toSet)
// Validate errorUnavailableEndpoints = true
val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true)
assertEquals(1, topicMetadatasWithError.size)
val topicMetadataWithError = topicMetadatasWithError.head
- assertEquals(Errors.NONE, topicMetadataWithError.error)
+ assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode)
- val partitionMetadatasWithError = topicMetadataWithError.partitionMetadata
+ val partitionMetadatasWithError = topicMetadataWithError.partitions
assertEquals(1, partitionMetadatasWithError.size)
val partitionMetadataWithError = partitionMetadatasWithError.get(0)
- assertEquals(0, partitionMetadataWithError.partition)
- assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionMetadataWithError.error)
- assertEquals(Set(0), partitionMetadataWithError.replicaIds.asScala.toSet)
- assertEquals(Set(0), partitionMetadataWithError.inSyncReplicaIds.asScala.toSet)
+ assertEquals(0, partitionMetadataWithError.partitionIndex)
+ assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, partitionMetadataWithError.errorCode)
+ assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet)
+ assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet)
}
@Test
@@ -449,8 +449,8 @@ class MetadataCacheTest {
val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals(1, topicMetadata.size)
- assertEquals(1, topicMetadata.head.partitionMetadata.size)
- assertEquals(Optional.empty, topicMetadata.head.partitionMetadata.get(0).leaderId)
+ assertEquals(1, topicMetadata.head.partitions.size)
+ assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, topicMetadata.head.partitions.get(0).leaderId)
}
@Test
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 175ad6a264353..eb23fee0efd47 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -431,4 +431,10 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
+
+
+
+
+
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
new file mode 100644
index 0000000000000..378dbd48d0e97
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import kafka.controller.KafkaController;
+import kafka.coordinator.group.GroupCoordinator;
+import kafka.coordinator.transaction.TransactionCoordinator;
+import kafka.network.RequestChannel;
+import kafka.server.AdminManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.ClientQuotaManager;
+import kafka.server.ClientRequestQuotaManager;
+import kafka.server.FetchManager;
+import kafka.server.KafkaApis;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.MetadataCache;
+import kafka.server.QuotaFactory;
+import kafka.server.ReplicaManager;
+import kafka.server.ReplicationQuotaManager;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.UpdateMetadataRequest;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.SystemTime;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import scala.Option;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+
+public class MetadataRequestBenchmark {
+ @Param({"500", "1000", "5000"})
+ private int topicCount;
+ @Param({"10", "20", "50"})
+ private int partitionCount;
+
+ private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
+ private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
+ private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+ private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
+ private AdminManager adminManager = Mockito.mock(AdminManager.class);
+ private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
+ private KafkaController kafkaController = Mockito.mock(KafkaController.class);
+ private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
+ private Metrics metrics = new Metrics();
+ private int brokerId = 1;
+ private MetadataCache metadataCache = new MetadataCache(brokerId);
+ private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
+ private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
+ private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
+ private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
+ clientQuotaManager, clientRequestQuotaManager, replicaQuotaManager, replicaQuotaManager,
+ replicaQuotaManager, Option.empty());
+ private FetchManager fetchManager = Mockito.mock(FetchManager.class);
+ private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
+ private KafkaApis kafkaApis;
+ private RequestChannel.Request allTopicMetadataRequest;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ initializeMetadataCache();
+ kafkaApis = createKafkaApis();
+ allTopicMetadataRequest = buildAllTopicMetadataRequest();
+ }
+
+ private void initializeMetadataCache() {
+ List liveBrokers = new LinkedList<>();
+ List partitionStates = new LinkedList<>();
+
+ IntStream.range(0, 5).forEach(brokerId -> liveBrokers.add(
+ new UpdateMetadataBroker().setId(brokerId)
+ .setEndpoints(endpoints(brokerId))
+ .setRack("rack1")));
+
+ IntStream.range(0, topicCount).forEach(topicId -> {
+ String topicName = "topic-" + topicId;
+
+ IntStream.range(0, partitionCount).forEach(partitionId -> {
+ partitionStates.add(
+ new UpdateMetadataPartitionState().setTopicName(topicName)
+ .setPartitionIndex(partitionId)
+ .setControllerEpoch(1)
+ .setLeader(partitionCount % 5)
+ .setLeaderEpoch(0)
+ .setIsr(Arrays.asList(0, 1, 3))
+ .setZkVersion(1)
+ .setReplicas(Arrays.asList(0, 1, 3)));
+ });
+ });
+
+ UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(
+ ApiKeys.UPDATE_METADATA.latestVersion(),
+ 1, 1, 1,
+ partitionStates, liveBrokers).build();
+ metadataCache.updateMetadata(100, updateMetadataRequest);
+ }
+
+ private List endpoints(final int brokerId) {
+ return Collections.singletonList(
+ new UpdateMetadataEndpoint()
+ .setHost("host_" + brokerId)
+ .setPort(9092)
+ .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+ .setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value()));
+ }
+
+ private KafkaApis createKafkaApis() {
+ Properties kafkaProps = new Properties();
+ kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk");
+ kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
+ return new KafkaApis(requestChannel,
+ replicaManager,
+ adminManager,
+ groupCoordinator,
+ transactionCoordinator,
+ kafkaController,
+ kafkaZkClient,
+ brokerId,
+ new KafkaConfig(kafkaProps),
+ metadataCache,
+ metrics,
+ Option.empty(),
+ quotaManagers,
+ fetchManager,
+ brokerTopicStats,
+ "clusterId",
+ new SystemTime(),
+ null);
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() {
+ kafkaApis.close();
+ metrics.close();
+ }
+
+ private RequestChannel.Request buildAllTopicMetadataRequest() {
+ MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build();
+ ByteBuffer buffer = metadataRequest.serialize(new RequestHeader(metadataRequest.api,
+ metadataRequest.version(), "", 0));
+ RequestHeader header = RequestHeader.parse(buffer);
+
+ RequestContext context = new RequestContext(header, "1", null, principal,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
+ return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer, requestChannelMetrics);
+ }
+
+ @Benchmark
+ public void testMetadataRequestForAllTopics() {
+ kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest);
+ }
+}