From 8a278e60bebded4a67b16077572d82a6e54e5143 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Fri, 19 Jan 2018 10:29:55 -0800 Subject: [PATCH] Minor: MetadataResponse#toStruct should serialize null leaders correctly. In MetadataResponse deserialization, if the partition leader key is set to -1, the leader is set to null. The MetadataResponse#toStruct code should handle this correctly as well. Also fix a case in KafkaApis where we were not taking into account the possibility of the leader being null. RequestResponseTest should test this as well. --- .../apache/kafka/common/requests/MetadataResponse.java | 8 ++++++-- .../apache/kafka/common/requests/RequestResponseTest.java | 3 +++ core/src/main/scala/kafka/server/KafkaApis.scala | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 99c4ffb5fb5b6..cda3c0793b91a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -464,6 +464,10 @@ public int partition() { return partition; } + public int leaderId() { + return leader == null ? -1 : leader.id(); + } + public Node leader() { return leader; } @@ -482,7 +486,7 @@ public List offlineReplicas() { @Override public String toString() { - return "(type=PartitionMetadata," + + return "(type=PartitionMetadata" + ", error=" + error + ", partition=" + partition + ", leader=" + leader + @@ -531,7 +535,7 @@ protected Struct toStruct(short version) { Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); partitionData.set(ERROR_CODE, partitionMetadata.error.code()); partitionData.set(PARTITION_ID, partitionMetadata.partition); - partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id()); + partitionData.set(LEADER_KEY_NAME, partitionMetadata.leaderId()); ArrayList replicas = new ArrayList<>(partitionMetadata.replicas.size()); for (Node node : partitionMetadata.replicas) replicas.add(node.id()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c18f5c2902971..27406167361bd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -702,6 +702,9 @@ private MetadataResponse createMetadataResponse() { asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr, offlineReplicas)))); allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false, Collections.emptyList())); + allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false, + asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null, + replicas, isr, offlineReplicas)))); return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1ff75c0ed9b56..13f516466679e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1105,6 +1105,7 @@ class KafkaApis(val requestChannel: RequestChannel, val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala .find(_.partition == partition) .map(_.leader) + .flatMap(p => Option(p)) coordinatorEndpoint match { case Some(endpoint) if !endpoint.isEmpty =>