Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ project(':jmh-benchmarks') {

doFirst {
if (System.getProperty("jmhArgs")) {
args System.getProperty("jmhArgs").split(',')
args System.getProperty("jmhArgs").split(' ')
}
args = [shadowJar.archivePath, *args]
}
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
<allow pkg="kafka.log"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.api"/>
<allow pkg="kafka.controller"/>
<allow pkg="kafka.coordinator"/>
<allow pkg="kafka.network"/>
<allow pkg="kafka.zk"/>
<allow class="kafka.utils.Pool"/>
<allow class="kafka.utils.KafkaScheduler"/>
<allow class="org.apache.kafka.clients.FetchSessionHandler"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,27 @@ public static MetadataResponse prepareResponse(Collection<Node> brokers, String
return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
}

public static MetadataResponse prepareResponse(int throttleTimeMs, List<MetadataResponseTopic> topicMetadataList,
Collection<Node> brokers, String clusterId, int controllerId,
int clusterAuthorizedOperations) {
MetadataResponseData responseData = new MetadataResponseData();
responseData.setThrottleTimeMs(throttleTimeMs);
brokers.forEach(broker ->
responseData.brokers().add(new MetadataResponseBroker()
.setNodeId(broker.id())
.setHost(broker.host())
.setPort(broker.port())
.setRack(broker.rack()))
);

responseData.setClusterId(clusterId);
responseData.setControllerId(controllerId);
responseData.setClusterAuthorizedOperations(clusterAuthorizedOperations);

topicMetadataList.forEach(topicMetadata -> responseData.topics().add(topicMetadata));
return new MetadataResponse(responseData);
}

@Override
public boolean shouldClientThrottle(short version) {
return version >= 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
Expand Down Expand Up @@ -112,8 +114,6 @@
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
Expand Down Expand Up @@ -342,28 +342,33 @@ private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors err
}

private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
List<TopicMetadata> metadata = new ArrayList<>();
List<MetadataResponseTopic> metadata = new ArrayList<>();
for (String topic : cluster.topics()) {
List<PartitionMetadata> pms = new ArrayList<>();
List<MetadataResponsePartition> pms = new ArrayList<>();
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
PartitionMetadata pm = new PartitionMetadata(error,
new TopicPartition(topic, pInfo.partition()),
Optional.of(pInfo.leader().id()),
Optional.of(234),
Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()),
Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()),
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
MetadataResponsePartition pm = new MetadataResponsePartition()
.setErrorCode(error.code())
.setPartitionIndex(pInfo.partition())
.setLeaderId(pInfo.leader().id())
.setLeaderEpoch(234)
.setReplicaNodes(Arrays.stream(pInfo.replicas()).map(Node::id).collect(Collectors.toList()))
.setIsrNodes(Arrays.stream(pInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toList()))
.setOfflineReplicas(Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
pms.add(pm);
}
TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
MetadataResponseTopic tm = new MetadataResponseTopic()
.setErrorCode(error.code())
.setName(topic)
.setIsInternal(false)
.setPartitions(pms);
metadata.add(tm);
}
return MetadataResponse.prepareResponse(0,
cluster.nodes(),
cluster.clusterResource().clusterId(),
cluster.controller().id(),
metadata,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
metadata,
cluster.nodes(),
cluster.clusterResource().clusterId(),
cluster.controller().id(),
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
}

/**
Expand Down Expand Up @@ -1107,19 +1112,19 @@ public void testDescribeCluster() throws Exception {

// Prepare the metadata response used for the first describe cluster
MetadataResponse response = MetadataResponse.prepareResponse(0,
Collections.emptyList(),
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
Collections.emptyList(),
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
env.kafkaClient().prepareResponse(response);

// Prepare the metadata response used for the second describe cluster
MetadataResponse response2 = MetadataResponse.prepareResponse(0,
Collections.emptyList(),
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
3,
Collections.emptyList(),
1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code());
env.kafkaClient().prepareResponse(response2);

Expand Down
64 changes: 36 additions & 28 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.MetadataResponseData
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
import org.apache.kafka.server.authorizer._

import scala.compat.java8.OptionConverters._
Expand Down Expand Up @@ -1000,24 +1002,30 @@ class KafkaApis(val requestChannel: RequestChannel,
private def createTopic(topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: util.Properties = new util.Properties()): MetadataResponse.TopicMetadata = {
properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
try {
adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
util.Collections.emptyList())
metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
} catch {
case _: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
util.Collections.emptyList())
metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
case ex: Throwable => // Catch all to prevent unhandled errors
new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
util.Collections.emptyList())
metadataResponseTopic(Errors.forException(ex), topic, isInternal(topic), util.Collections.emptyList())
}
}

private def createInternalTopic(topic: String): MetadataResponse.TopicMetadata = {
private def metadataResponseTopic(error: Errors, topic: String, isInternal: Boolean,
partitionData: util.List[MetadataResponsePartition]): MetadataResponseTopic = {
new MetadataResponseTopic()
.setErrorCode(error.code)
.setName(topic)
.setIsInternal(isInternal)
.setPartitions(partitionData)
}

private def createInternalTopic(topic: String): MetadataResponseTopic = {
if (topic == null)
throw new IllegalArgumentException("topic must not be null")

Expand All @@ -1030,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
s"and not all brokers are up yet.")
new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
} else {
createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
groupCoordinator.offsetsTopicConfigs)
Expand All @@ -1041,7 +1049,7 @@ class KafkaApis(val requestChannel: RequestChannel,
s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
s"and not all brokers are up yet.")
new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
} else {
createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
txnCoordinator.transactionTopicConfigs)
Expand All @@ -1050,31 +1058,31 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponse.TopicMetadata = {
private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = {
val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
topicMetadata.headOption.getOrElse(createInternalTopic(topic))
}

private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = {
errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val nonExistentTopics = topics -- topicResponses.map(_.name).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
if (isInternal(topic)) {
val topicMetadata = createInternalTopic(topic)
if (topicMetadata.error == Errors.COORDINATOR_NOT_AVAILABLE)
new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
else
topicMetadata
} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList())
metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList())
}
}
topicResponses ++ responsesForNonExistentTopics
Expand Down Expand Up @@ -1109,17 +1117,17 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic),
util.Collections.emptyList()))
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic), util.Collections.emptyList()))


// do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
val unauthorizedForDescribeTopicMetadata =
// In case of all topics, don't include topics unauthorized for Describe
if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
Set.empty[MetadataResponse.TopicMetadata]
Set.empty[MetadataResponseTopic]
else
unauthorizedForDescribeTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList()))
metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList()))

// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
Expand All @@ -1129,7 +1137,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val errorUnavailableListeners = requestVersion >= 6
val topicMetadata =
if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
Seq.empty[MetadataResponseTopic]
else
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
Expand All @@ -1147,7 +1155,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// get topic authorized operations
if (metadataRequest.data.includeTopicAuthorizedOperations) {
topicMetadata.foreach { topicData =>
topicData.authorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.topic)))
topicData.setTopicAuthorizedOperations(authorizedOperations(request, new Resource(ResourceType.TOPIC, topicData.name)))
}
}
}
Expand All @@ -1162,10 +1170,10 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, requestThrottleMs =>
MetadataResponse.prepareResponse(
requestThrottleMs,
completeTopicMetadata.asJava,
brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
}
Expand Down Expand Up @@ -1285,13 +1293,13 @@ class KafkaApis(val requestChannel: RequestChannel,
.setPort(node.port)
.setThrottleTimeMs(requestThrottleMs))
}
val responseBody = if (topicMetadata.error != Errors.NONE) {
val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
} else {
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
.filter(_.leaderId.isPresent)
.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId.get))
val coordinatorEndpoint = topicMetadata.partitions.asScala
.find(_.partitionIndex == partition)
.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
.flatMap(_.getNode(request.context.listenerName))
.filterNot(_.isEmpty)

Expand Down
Loading