diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4d93a1b4fcbcd..86f4e68fb5578 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -345,7 +345,17 @@ public synchronized Set pausedPartitions() { * of the current generation; otherwise it returns the same set as {@link #subscription()} */ synchronized Set metadataTopics() { - return groupSubscription.isEmpty() ? subscription : groupSubscription; + if (groupSubscription.isEmpty()) + return subscription; + else if (groupSubscription.containsAll(subscription)) + return groupSubscription; + else { + // When subscription changes `groupSubscription` may be outdated, ensure that + // new subscription topics are returned. + Set topics = new HashSet<>(groupSubscription); + topics.addAll(subscription); + return topics; + } } synchronized boolean needsMetadata(String topic) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index afba13b5b3bd9..d61010ee94aae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -782,6 +782,35 @@ public void testOutdatedCoordinatorAssignment() { assertEquals(assigned, rebalanceListener.assigned); } + @Test + public void testMetadataTopicsDuringSubscriptionChange() { + final String consumerId = "subscription_change"; + final List oldSubscription = singletonList(topic1); + final List oldAssignment = Collections.singletonList(t1p); + final List newSubscription = singletonList(topic2); + final List newAssignment = Collections.singletonList(t2p); + + subscriptions.subscribe(toSet(oldSubscription), rebalanceListener); + assertEquals(toSet(oldSubscription), subscriptions.metadataTopics()); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + prepareJoinAndSyncResponse(consumerId, 1, oldSubscription, oldAssignment); + + coordinator.poll(time.timer(0)); + assertEquals(toSet(oldSubscription), subscriptions.metadataTopics()); + + subscriptions.subscribe(toSet(newSubscription), rebalanceListener); + assertEquals(Utils.mkSet(topic1, topic2), subscriptions.metadataTopics()); + + prepareJoinAndSyncResponse(consumerId, 2, newSubscription, newAssignment); + coordinator.poll(time.timer(Long.MAX_VALUE)); + assertFalse(coordinator.rejoinNeededOrPending()); + assertEquals(toSet(newAssignment), subscriptions.assignedPartitions()); + assertEquals(toSet(newSubscription), subscriptions.metadataTopics()); + } + @Test public void testPatternJoinGroupLeader() { final String consumerId = "leader"; @@ -3107,6 +3136,19 @@ private void prepareOffsetCommitRequest(final Map expected client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected); } + private void prepareJoinAndSyncResponse(String consumerId, int generation, List subscription, List assignment) { + partitionAssignor.prepare(singletonMap(consumerId, assignment)); + client.prepareResponse( + joinGroupLeaderResponse( + generation, consumerId, singletonMap(consumerId, subscription), Errors.NONE)); + client.prepareResponse(body -> { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.data.memberId().equals(consumerId) && + sync.data.generationId() == generation && + sync.groupAssignments().containsKey(consumerId); + }, syncGroupResponse(assignment, Errors.NONE)); + } + private Map partitionErrors(Collection partitions, Errors error) { final Map errors = new HashMap<>(); for (TopicPartition partition : partitions) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index cc74652abe832..ae00b8e3e4b69 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -122,6 +122,12 @@ public void testGroupSubscribe() { // `groupSubscribe` does not accumulate assertFalse(state.groupSubscribe(singleton(topic1))); assertEquals(singleton(topic1), state.metadataTopics()); + + state.subscribe(singleton("anotherTopic"), rebalanceListener); + assertEquals(Utils.mkSet(topic1, "anotherTopic"), state.metadataTopics()); + + assertFalse(state.groupSubscribe(singleton("anotherTopic"))); + assertEquals(singleton("anotherTopic"), state.metadataTopics()); } @Test