diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 21545d8f1eb9c..8372cc0319134 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -292,6 +292,8 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, Predicate topicsToRetain) { Set internalTopics = new HashSet<>(); List partitions = new ArrayList<>(); + Map brokersById = metadataResponse.brokersById(); + for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) { if (!topicsToRetain.test(metadata)) continue; @@ -299,12 +301,30 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, if (metadata.error() == Errors.NONE) { if (metadata.isInternal()) internalTopics.add(metadata.topic()); - for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { + for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { // Even if the partition's metadata includes an error, we need to handle the update to catch new epochs updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> { int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH); - partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch)); + Node leader = partitionInfo.leader(); + + if (leader != null && !leader.equals(brokersById.get(leader.id()))) { + // If we are reusing metadata from a previous response (which is possible if it + // contained a larger epoch), we may not have leader information available in the + // latest response. To keep the state consistent, we override the partition metadata + // so that the leader is set consistently with the broker metadata + + PartitionInfo partitionInfoWithoutLeader = new PartitionInfo( + partitionInfo.topic(), + partitionInfo.partition(), + brokersById.get(leader.id()), + partitionInfo.replicas(), + partitionInfo.inSyncReplicas(), + partitionInfo.offlineReplicas()); + partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfoWithoutLeader, epoch)); + } else { + partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch)); + } }); if (partitionMetadata.error().exception() instanceof InvalidMetadataException) { @@ -319,7 +339,7 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, } } - return new MetadataCache(metadataResponse.clusterId(), new ArrayList<>(metadataResponse.brokers()), partitions, + return new MetadataCache(metadataResponse.clusterId(), brokersById.values(), partitions, metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, metadataResponse.controller()); diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java index e58da12127365..10b5cc92e7694 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java @@ -39,7 +39,7 @@ */ public class MetadataCache { private final String clusterId; - private final List nodes; + private final Collection nodes; private final Set unauthorizedTopics; private final Set invalidTopics; private final Set internalTopics; @@ -49,7 +49,7 @@ public class MetadataCache { private Cluster clusterInstance; MetadataCache(String clusterId, - List nodes, + Collection nodes, Collection partitions, Set unauthorizedTopics, Set invalidTopics, @@ -59,7 +59,7 @@ public class MetadataCache { } MetadataCache(String clusterId, - List nodes, + Collection nodes, Collection partitions, Set unauthorizedTopics, Set invalidTopics, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index ef5381b444376..350da2b7dfebb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -20,9 +20,9 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.message.MetadataResponseData; -import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; -import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -32,7 +32,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -178,6 +177,14 @@ private Holder holder() { * @return the brokers */ public Collection brokers() { + return holder().brokers.values(); + } + + /** + * Get a map of all brokers keyed by the brokerId. + * @return A map from the brokerId to the broker `Node` information + */ + public Map brokersById() { return holder().brokers; } @@ -360,23 +367,22 @@ public String toString() { } private static class Holder { - private final Collection brokers; + private final Map brokers; private final Node controller; private final Collection topicMetadata; Holder(MetadataResponseData data) { - this.brokers = Collections.unmodifiableCollection(createBrokers(data)); - Map brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b)); - this.topicMetadata = createTopicMetadata(data, brokerMap); - this.controller = brokerMap.get(data.controllerId()); + this.brokers = createBrokers(data); + this.topicMetadata = createTopicMetadata(data); + this.controller = brokers.get(data.controllerId()); } - private Collection createBrokers(MetadataResponseData data) { - return data.brokers().valuesList().stream().map(b -> - new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList()); + private Map createBrokers(MetadataResponseData data) { + return data.brokers().valuesList().stream().map(b -> new Node(b.nodeId(), b.host(), b.port(), b.rack())) + .collect(Collectors.toMap(Node::id, b -> b)); } - private Collection createTopicMetadata(MetadataResponseData data, Map brokerMap) { + private Collection createTopicMetadata(MetadataResponseData data) { List topicMetadataList = new ArrayList<>(); for (MetadataResponseTopic topicMetadata : data.topics()) { Errors topicError = Errors.forCode(topicMetadata.errorCode()); @@ -389,10 +395,10 @@ private Collection createTopicMetadata(MetadataResponseData data, int partitionIndex = partitionMetadata.partitionIndex(); int leader = partitionMetadata.leaderId(); Optional leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch()); - Node leaderNode = leader == -1 ? null : brokerMap.get(leader); - List replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes()); - List isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes()); - List offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas()); + Node leaderNode = leader == -1 ? null : brokers.get(leader); + List replicaNodes = convertToNodes(partitionMetadata.replicaNodes()); + List isrNodes = convertToNodes(partitionMetadata.isrNodes()); + List offlineNodes = convertToNodes(partitionMetadata.offlineReplicas()); partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch, replicaNodes, isrNodes, offlineNodes)); } @@ -403,7 +409,7 @@ private Collection createTopicMetadata(MetadataResponseData data, return topicMetadataList; } - private List convertToNodes(Map brokers, List brokerIds) { + private List convertToNodes(List brokerIds) { List nodes = new ArrayList<>(brokerIds.size()); for (Integer brokerId : brokerIds) { Node node = brokers.get(brokerId); diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index bb340941b21fe..9a9026d6da3a7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -23,6 +23,12 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrokerCollection; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.LogContext; @@ -33,9 +39,12 @@ import org.junit.Test; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.test.TestUtils.assertOptional; import static org.junit.Assert.assertEquals; @@ -495,4 +504,78 @@ public void testNodeIfOffline() { assertEquals(metadata.fetch().nodeById(0).id(), 0); assertEquals(metadata.fetch().nodeById(1).id(), 1); } + + @Test + public void testLeaderMetadataInconsistentWithBrokerMetadata() { + // Tests a reordering scenario which can lead to inconsistent leader state. + // A partition initially has one broker offline. That broker comes online and + // is elected leader. The client sees these two events in the opposite order. + + TopicPartition tp = new TopicPartition("topic", 0); + + Node node0 = new Node(0, "localhost", 9092); + Node node1 = new Node(1, "localhost", 9093); + Node node2 = new Node(2, "localhost", 9094); + + // The first metadata received by broker (epoch=10) + MetadataResponsePartition firstPartitionMetadata = new MetadataResponsePartition() + .setPartitionIndex(tp.partition()) + .setErrorCode(Errors.NONE.code()) + .setLeaderEpoch(10) + .setLeaderId(0) + .setReplicaNodes(Arrays.asList(0, 1, 2)) + .setIsrNodes(Arrays.asList(0, 1, 2)) + .setOfflineReplicas(Collections.emptyList()); + + // The second metadata received has stale metadata (epoch=8) + MetadataResponsePartition secondPartitionMetadata = new MetadataResponsePartition() + .setPartitionIndex(tp.partition()) + .setErrorCode(Errors.NONE.code()) + .setLeaderEpoch(8) + .setLeaderId(1) + .setReplicaNodes(Arrays.asList(0, 1, 2)) + .setIsrNodes(Arrays.asList(1, 2)) + .setOfflineReplicas(Collections.singletonList(0)); + + metadata.update(new MetadataResponse(new MetadataResponseData() + .setTopics(buildTopicCollection(tp.topic(), firstPartitionMetadata)) + .setBrokers(buildBrokerCollection(Arrays.asList(node0, node1, node2)))), + 10L); + + metadata.update(new MetadataResponse(new MetadataResponseData() + .setTopics(buildTopicCollection(tp.topic(), secondPartitionMetadata)) + .setBrokers(buildBrokerCollection(Arrays.asList(node1, node2)))), + 20L); + + assertNull(metadata.fetch().leaderFor(tp)); + assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); + assertTrue(metadata.leaderAndEpoch(tp).leader.isEmpty()); + } + + private MetadataResponseTopicCollection buildTopicCollection(String topic, MetadataResponsePartition partitionMetadata) { + MetadataResponseTopic topicMetadata = new MetadataResponseTopic() + .setErrorCode(Errors.NONE.code()) + .setName(topic) + .setIsInternal(false); + + topicMetadata.setPartitions(Collections.singletonList(partitionMetadata)); + + MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection(); + topics.add(topicMetadata); + return topics; + } + + private MetadataResponseBrokerCollection buildBrokerCollection(List nodes) { + MetadataResponseBrokerCollection brokers = new MetadataResponseBrokerCollection(); + for (Node node : nodes) { + MetadataResponseBroker broker = new MetadataResponseBroker() + .setNodeId(node.id()) + .setHost(node.host()) + .setPort(node.port()) + .setRack(node.rack()); + brokers.add(broker); + } + return brokers; + } + }