diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 99c4ffb5fb5b6..cda3c0793b91a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -464,6 +464,10 @@ public int partition() { return partition; } + public int leaderId() { + return leader == null ? -1 : leader.id(); + } + public Node leader() { return leader; } @@ -482,7 +486,7 @@ public List offlineReplicas() { @Override public String toString() { - return "(type=PartitionMetadata," + + return "(type=PartitionMetadata" + ", error=" + error + ", partition=" + partition + ", leader=" + leader + @@ -531,7 +535,7 @@ protected Struct toStruct(short version) { Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); partitionData.set(ERROR_CODE, partitionMetadata.error.code()); partitionData.set(PARTITION_ID, partitionMetadata.partition); - partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id()); + partitionData.set(LEADER_KEY_NAME, partitionMetadata.leaderId()); ArrayList replicas = new ArrayList<>(partitionMetadata.replicas.size()); for (Node node : partitionMetadata.replicas) replicas.add(node.id()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c18f5c2902971..27406167361bd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -702,6 +702,9 @@ private MetadataResponse createMetadataResponse() { asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr, offlineReplicas)))); allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false, Collections.emptyList())); + allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false, + asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null, + replicas, isr, offlineReplicas)))); return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1ff75c0ed9b56..13f516466679e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1105,6 +1105,7 @@ class KafkaApis(val requestChannel: RequestChannel, val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala .find(_.partition == partition) .map(_.leader) + .flatMap(p => Option(p)) coordinatorEndpoint match { case Some(endpoint) if !endpoint.isEmpty =>