Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,19 +292,39 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
Set<String> internalTopics = new HashSet<>();
List<MetadataCache.PartitionInfoAndEpoch> partitions = new ArrayList<>();
Map<Integer, Node> brokersById = metadataResponse.brokersById();

for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
if (!topicsToRetain.test(metadata))
continue;

if (metadata.error() == Errors.NONE) {
if (metadata.isInternal())
internalTopics.add(metadata.topic());
for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {

for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
// Even if the partition's metadata includes an error, we need to handle the update to catch new epochs
updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> {
int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH);
partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
Node leader = partitionInfo.leader();

if (leader != null && !leader.equals(brokersById.get(leader.id()))) {
// If we are reusing metadata from a previous response (which is possible if it
// contained a larger epoch), we may not have leader information available in the
// latest response. To keep the state consistent, we override the partition metadata
// so that the leader is set consistently with the broker metadata

PartitionInfo partitionInfoWithoutLeader = new PartitionInfo(
partitionInfo.topic(),
partitionInfo.partition(),
brokersById.get(leader.id()),
partitionInfo.replicas(),
partitionInfo.inSyncReplicas(),
partitionInfo.offlineReplicas());
partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfoWithoutLeader, epoch));
} else {
partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
}
});

if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
Expand All @@ -319,7 +339,7 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
}
}

return new MetadataCache(metadataResponse.clusterId(), new ArrayList<>(metadataResponse.brokers()), partitions,
return new MetadataCache(metadataResponse.clusterId(), brokersById.values(), partitions,
metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
internalTopics, metadataResponse.controller());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public class MetadataCache {
private final String clusterId;
private final List<Node> nodes;
private final Collection<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
Expand All @@ -49,7 +49,7 @@ public class MetadataCache {
private Cluster clusterInstance;

MetadataCache(String clusterId,
List<Node> nodes,
Collection<Node> nodes,
Collection<PartitionInfoAndEpoch> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Expand All @@ -59,7 +59,7 @@ public class MetadataCache {
}

MetadataCache(String clusterId,
List<Node> nodes,
Collection<Node> nodes,
Collection<PartitionInfoAndEpoch> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
Expand All @@ -32,7 +32,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -178,6 +177,14 @@ private Holder holder() {
* @return the brokers
*/
public Collection<Node> brokers() {
return holder().brokers.values();
}

/**
* Get a map of all brokers keyed by the brokerId.
* @return A map from the brokerId to the broker `Node` information
*/
public Map<Integer, Node> brokersById() {
return holder().brokers;
}

Expand Down Expand Up @@ -360,23 +367,22 @@ public String toString() {
}

private static class Holder {
private final Collection<Node> brokers;
private final Map<Integer, Node> brokers;
private final Node controller;
private final Collection<TopicMetadata> topicMetadata;

Holder(MetadataResponseData data) {
this.brokers = Collections.unmodifiableCollection(createBrokers(data));
Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
this.topicMetadata = createTopicMetadata(data, brokerMap);
this.controller = brokerMap.get(data.controllerId());
this.brokers = createBrokers(data);
this.topicMetadata = createTopicMetadata(data);
this.controller = brokers.get(data.controllerId());
}

private Collection<Node> createBrokers(MetadataResponseData data) {
return data.brokers().valuesList().stream().map(b ->
new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
private Map<Integer, Node> createBrokers(MetadataResponseData data) {
return data.brokers().valuesList().stream().map(b -> new Node(b.nodeId(), b.host(), b.port(), b.rack()))
.collect(Collectors.toMap(Node::id, b -> b));
}

private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data) {
List<TopicMetadata> topicMetadataList = new ArrayList<>();
for (MetadataResponseTopic topicMetadata : data.topics()) {
Errors topicError = Errors.forCode(topicMetadata.errorCode());
Expand All @@ -389,10 +395,10 @@ private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data,
int partitionIndex = partitionMetadata.partitionIndex();
int leader = partitionMetadata.leaderId();
Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
Node leaderNode = leader == -1 ? null : brokers.get(leader);
List<Node> replicaNodes = convertToNodes(partitionMetadata.replicaNodes());
List<Node> isrNodes = convertToNodes(partitionMetadata.isrNodes());
List<Node> offlineNodes = convertToNodes(partitionMetadata.offlineReplicas());
partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
replicaNodes, isrNodes, offlineNodes));
}
Expand All @@ -403,7 +409,7 @@ private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data,
return topicMetadataList;
}

private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
private List<Node> convertToNodes(List<Integer> brokerIds) {
List<Node> nodes = new ArrayList<>(brokerIds.size());
for (Integer brokerId : brokerIds) {
Node node = brokers.get(brokerId);
Expand Down
83 changes: 83 additions & 0 deletions clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrokerCollection;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -33,9 +39,12 @@
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -495,4 +504,78 @@ public void testNodeIfOffline() {
assertEquals(metadata.fetch().nodeById(0).id(), 0);
assertEquals(metadata.fetch().nodeById(1).id(), 1);
}

@Test
public void testLeaderMetadataInconsistentWithBrokerMetadata() {
// Tests a reordering scenario which can lead to inconsistent leader state.
// A partition initially has one broker offline. That broker comes online and
// is elected leader. The client sees these two events in the opposite order.

TopicPartition tp = new TopicPartition("topic", 0);

Node node0 = new Node(0, "localhost", 9092);
Node node1 = new Node(1, "localhost", 9093);
Node node2 = new Node(2, "localhost", 9094);

// The first metadata received by broker (epoch=10)
MetadataResponsePartition firstPartitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(10)
.setLeaderId(0)
.setReplicaNodes(Arrays.asList(0, 1, 2))
.setIsrNodes(Arrays.asList(0, 1, 2))
.setOfflineReplicas(Collections.emptyList());

// The second metadata received has stale metadata (epoch=8)
MetadataResponsePartition secondPartitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(8)
.setLeaderId(1)
.setReplicaNodes(Arrays.asList(0, 1, 2))
.setIsrNodes(Arrays.asList(1, 2))
.setOfflineReplicas(Collections.singletonList(0));

metadata.update(new MetadataResponse(new MetadataResponseData()
.setTopics(buildTopicCollection(tp.topic(), firstPartitionMetadata))
.setBrokers(buildBrokerCollection(Arrays.asList(node0, node1, node2)))),
10L);

metadata.update(new MetadataResponse(new MetadataResponseData()
.setTopics(buildTopicCollection(tp.topic(), secondPartitionMetadata))
.setBrokers(buildBrokerCollection(Arrays.asList(node1, node2)))),
20L);

assertNull(metadata.fetch().leaderFor(tp));
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
assertTrue(metadata.leaderAndEpoch(tp).leader.isEmpty());
}

private MetadataResponseTopicCollection buildTopicCollection(String topic, MetadataResponsePartition partitionMetadata) {
MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
.setErrorCode(Errors.NONE.code())
.setName(topic)
.setIsInternal(false);

topicMetadata.setPartitions(Collections.singletonList(partitionMetadata));

MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
topics.add(topicMetadata);
return topics;
}

private MetadataResponseBrokerCollection buildBrokerCollection(List<Node> nodes) {
MetadataResponseBrokerCollection brokers = new MetadataResponseBrokerCollection();
for (Node node : nodes) {
MetadataResponseBroker broker = new MetadataResponseBroker()
.setNodeId(node.id())
.setHost(node.host())
.setPort(node.port())
.setRack(node.rack());
brokers.add(broker);
}
return brokers;
}

}