Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,17 @@ public synchronized Set<TopicPartition> pausedPartitions() {
* of the current generation; otherwise it returns the same set as {@link #subscription()}
*/
synchronized Set<String> metadataTopics() {
return groupSubscription.isEmpty() ? subscription : groupSubscription;
if (groupSubscription.isEmpty())
return subscription;
else if (groupSubscription.containsAll(subscription))
return groupSubscription;
else {
// When subscription changes `groupSubscription` may be outdated, ensure that
// new subscription topics are returned.
Set<String> topics = new HashSet<>(groupSubscription);
topics.addAll(subscription);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems consistent with needsMetadata method below.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this change seems to make sense. I'm trying to understand the edge case a little bit better. It seems the basic scenario is the following:

  1. user calls subscribe. subscription is updated to (A), while group subscription might be (B)
  2. we call requestUpdateForNewTopics which bumps the request version
  3. metadata update gets triggered and requests (B) with the bumped request version

At this point, no further metadata update will be sent, but the consumer should rebalance. The part that confuses me a little bit is that we don't request a metadata update following the rebalance.

I guess it is due to SubscriptionState.groupSubscribe? Assuming that we remain the leader, if (A) is the only topic subscribed, then we will first change groupSubscription to (A). Then we will not request a new metadata update because groupSubscription matches subscription.

Alternatively, if we are not the leader, we will call resetGroupSubscription, which will set groupSubscription to (), but will not request an update.

Do I have that right?

Copy link
Copy Markdown
Contributor Author

@rajinisivaram rajinisivaram May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thank you, yes, that is exactly right.

return topics;
}
}

synchronized boolean needsMetadata(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,35 @@ public void testOutdatedCoordinatorAssignment() {
assertEquals(assigned, rebalanceListener.assigned);
}

@Test
public void testMetadataTopicsDuringSubscriptionChange() {
final String consumerId = "subscription_change";
final List<String> oldSubscription = singletonList(topic1);
final List<TopicPartition> oldAssignment = Collections.singletonList(t1p);
final List<String> newSubscription = singletonList(topic2);
final List<TopicPartition> newAssignment = Collections.singletonList(t2p);

subscriptions.subscribe(toSet(oldSubscription), rebalanceListener);
assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());

client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

prepareJoinAndSyncResponse(consumerId, 1, oldSubscription, oldAssignment);

coordinator.poll(time.timer(0));
assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());

subscriptions.subscribe(toSet(newSubscription), rebalanceListener);
assertEquals(Utils.mkSet(topic1, topic2), subscriptions.metadataTopics());

prepareJoinAndSyncResponse(consumerId, 2, newSubscription, newAssignment);
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
}

@Test
public void testPatternJoinGroupLeader() {
final String consumerId = "leader";
Expand Down Expand Up @@ -3107,6 +3136,19 @@ private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> expected
client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected);
}

private void prepareJoinAndSyncResponse(String consumerId, int generation, List<String> subscription, List<TopicPartition> assignment) {
partitionAssignor.prepare(singletonMap(consumerId, assignment));
client.prepareResponse(
joinGroupLeaderResponse(
generation, consumerId, singletonMap(consumerId, subscription), Errors.NONE));
client.prepareResponse(body -> {
SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.data.memberId().equals(consumerId) &&
sync.data.generationId() == generation &&
sync.groupAssignments().containsKey(consumerId);
}, syncGroupResponse(assignment, Errors.NONE));
}

private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) {
final Map<TopicPartition, Errors> errors = new HashMap<>();
for (TopicPartition partition : partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ public void testGroupSubscribe() {
// `groupSubscribe` does not accumulate
assertFalse(state.groupSubscribe(singleton(topic1)));
assertEquals(singleton(topic1), state.metadataTopics());

state.subscribe(singleton("anotherTopic"), rebalanceListener);
assertEquals(Utils.mkSet(topic1, "anotherTopic"), state.metadataTopics());

assertFalse(state.groupSubscribe(singleton("anotherTopic")));
assertEquals(singleton("anotherTopic"), state.metadataTopics());
}

@Test
Expand Down