diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 38eeab014d997..0ffe04d55fad7 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -62,7 +62,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator).java"/>
+ files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index c6dfb75552932..6610bfb025b46 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1357,7 +1357,7 @@ private static class MetadataSnapshot {
private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) {
Map partitionsPerTopic = new HashMap<>();
- for (String topic : subscription.groupSubscription()) {
+ for (String topic : subscription.metadataTopics()) {
Integer numPartitions = cluster.partitionCountForTopic(topic);
if (numPartitions != null)
partitionsPerTopic.put(topic, numPartitions);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index fbdf1c6c8595c..ef7d92417471b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -55,7 +55,7 @@ public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
if (subscription.hasPatternSubscription())
return MetadataRequest.Builder.allTopics();
List topics = new ArrayList<>();
- topics.addAll(subscription.groupSubscription());
+ topics.addAll(subscription.metadataTopics());
topics.addAll(transientTopics);
return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
}
@@ -72,7 +72,7 @@ synchronized void clearTransientTopics() {
@Override
protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
- if (transientTopics.contains(topic) || subscription.isGroupSubscribed(topic))
+ if (transientTopics.contains(topic) || subscription.needsMetadata(topic))
return true;
if (isInternal && !includeInternalTopics)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 92b997048e5f2..ec7c376ffc7c7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -183,12 +183,6 @@ private boolean changeSubscription(Set topicsToSubscribe) {
return false;
subscription = topicsToSubscribe;
- if (subscriptionType != SubscriptionType.USER_ASSIGNED) {
- groupSubscription = new HashSet<>(groupSubscription);
- groupSubscription.addAll(topicsToSubscribe);
- } else {
- groupSubscription = new HashSet<>(topicsToSubscribe);
- }
return true;
}
@@ -208,7 +202,7 @@ synchronized boolean groupSubscribe(Collection topics) {
* Reset the group's subscription to only contain topics subscribed by this consumer.
*/
synchronized void resetGroupSubscription() {
- groupSubscription = subscription;
+ groupSubscription = Collections.emptySet();
}
/**
@@ -335,9 +329,9 @@ public synchronized Set pausedPartitions() {
}
/**
- * Get the subscription for the group. For the leader, this will include the union of the
- * subscriptions of all group members. For followers, it is just that member's subscription.
- * This is used when querying topic metadata to detect the metadata changes which would
+ * Get the subcription topics for which metadata is required . For the leader, this will include
+ * the union of the subscriptions of all group members. For followers, it is just that member's
+ * subscription. This is used when querying topic metadata to detect the metadata changes which would
* require rebalancing. The leader fetches metadata for all topics in the group so that it
* can do the partition assignment (which requires at least partition counts for all topics
* to be assigned).
@@ -345,12 +339,12 @@ public synchronized Set pausedPartitions() {
* @return The union of all subscribed topics in the group if this member is the leader
* of the current generation; otherwise it returns the same set as {@link #subscription()}
*/
- synchronized Set groupSubscription() {
- return this.groupSubscription;
+ synchronized Set metadataTopics() {
+ return groupSubscription.isEmpty() ? subscription : groupSubscription;
}
- synchronized boolean isGroupSubscribed(String topic) {
- return groupSubscription.contains(topic);
+ synchronized boolean needsMetadata(String topic) {
+ return subscription.contains(topic) || groupSubscription.contains(topic);
}
private TopicPartitionState assignedState(TopicPartition tp) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7eaeb42653a20..b0b6fb80e50c7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -580,7 +580,7 @@ public void testNormalJoinGroupLeader() {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(toSet(assigned), subscriptions.assignedPartitions());
- assertEquals(subscription, subscriptions.groupSubscription());
+ assertEquals(subscription, subscriptions.metadataTopics());
assertEquals(0, rebalanceListener.revokedCount);
assertNull(rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
@@ -639,7 +639,7 @@ public void testOutdatedCoordinatorAssignment() {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
- assertEquals(toSet(newSubscription), subscriptions.groupSubscription());
+ assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
assertEquals(protocol == EAGER ? 1 : 0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertEquals(assigned, rebalanceListener.assigned);
@@ -678,7 +678,7 @@ public void testPatternJoinGroupLeader() {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(2, subscriptions.numAssignedPartitions());
- assertEquals(2, subscriptions.groupSubscription().size());
+ assertEquals(2, subscriptions.metadataTopics().size());
assertEquals(2, subscriptions.subscription().size());
// callback not triggered at all since there's nothing to be revoked
assertEquals(0, rebalanceListener.revokedCount);
@@ -909,7 +909,7 @@ public void testNormalJoinGroupFollower() {
assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(toSet(assigned), subscriptions.assignedPartitions());
- assertEquals(subscription, subscriptions.groupSubscription());
+ assertEquals(subscription, subscriptions.metadataTopics());
assertEquals(0, rebalanceListener.revokedCount);
assertNull(rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
@@ -1231,6 +1231,45 @@ public void testUpdateMetadataDuringRebalance() {
assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
}
+ /**
+ * Verifies that subscription change updates SubscriptionState correctly even after JoinGroup failures
+ * that don't re-invoke onJoinPrepare.
+ */
+ @Test
+ public void testSubscriptionChangeWithAuthorizationFailure() {
+ final String consumerId = "consumer";
+
+ // Subscribe to two topics of which only one is authorized and verify that metadata failure is propagated.
+ subscriptions.subscribe(Utils.mkSet(topic1, topic2), rebalanceListener);
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+ Collections.singletonMap(topic2, Errors.TOPIC_AUTHORIZATION_FAILED), singletonMap(topic1, 1)));
+ assertThrows(TopicAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+ client.respond(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ // Fail the first JoinGroup request
+ client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.emptyMap(),
+ Errors.GROUP_AUTHORIZATION_FAILED));
+ assertThrows(GroupAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
+
+ // Change subscription to include only the authorized topic. Complete rebalance and check that
+ // references to topic2 have been removed from SubscriptionState.
+ subscriptions.subscribe(Utils.mkSet(topic1), rebalanceListener);
+ assertEquals(Collections.singleton(topic1), subscriptions.metadataTopics());
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+ Collections.emptyMap(), singletonMap(topic1, 1)));
+
+ Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+ partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p)));
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
+ client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+
+ assertEquals(singleton(topic1), subscriptions.subscription());
+ assertEquals(singleton(topic1), subscriptions.metadataTopics());
+ }
+
@Test
public void testWakeupFromAssignmentCallback() {
final String topic = "topic1";
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 33d102d3b748f..b373192ca7066 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -102,8 +102,11 @@ public void testUserAssignment() {
@Test
public void testNormalSubscription() {
subscription.subscribe(Utils.mkSet("foo", "bar", "__consumer_offsets"), new NoOpConsumerRebalanceListener());
- subscription.groupSubscribe(Utils.mkSet("baz"));
+ subscription.groupSubscribe(Utils.mkSet("baz", "foo", "bar", "__consumer_offsets"));
testBasicSubscription(Utils.mkSet("foo", "bar", "baz"), Utils.mkSet("__consumer_offsets"));
+
+ subscription.resetGroupSubscription();
+ testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets"));
}
@Test