From 63aa4e38d29862618e934a49a5671dee3ad2463d Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Sun, 12 Jan 2020 16:34:45 +0000 Subject: [PATCH 1/6] KAFKA-9181; Ensure SubscriptionState.groupSubscription is updated even if onJoinPrepare not invoked after subscribe --- .../internals/ConsumerCoordinator.java | 5 +++ .../internals/ConsumerCoordinatorTest.java | 36 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index c6dfb75552932..8d4db0e903561 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -211,6 +211,7 @@ public String protocolType() { protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() { log.debug("Joining group with current subscription: {}", subscriptions.subscription()); this.joinedSubscription = subscriptions.subscription(); + subscriptions.resetGroupSubscription(); JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(); List topics = new ArrayList<>(joinedSubscription); @@ -1396,4 +1397,8 @@ public void invoke() { RebalanceProtocol getProtocol() { return protocol; } + + SubscriptionState subscriptions() { + return subscriptions; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 7eaeb42653a20..779ce271c3cc5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1231,6 +1231,42 @@ public void testUpdateMetadataDuringRebalance() { assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions()); } + /** + * Verifies that subscription change updates SubscriptionState correctly even after JoinGroup failures + * that don't re-invoke onJoinPrepare. + */ + @Test + public void testSubscriptionChangeWithAuthorizationFailure() { + final String consumerId = "consumer"; + + // Subscribe to two topics of which only one is authorized and verify that metadata failure is propagated. + subscriptions.subscribe(Utils.mkSet(topic1, topic2), rebalanceListener); + client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, + Collections.singletonMap(topic2, Errors.TOPIC_AUTHORIZATION_FAILED), singletonMap(topic1, 1))); + assertThrows(TopicAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE))); + + client.respond(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Fail the first JoinGroup request + client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.emptyMap(), + Errors.GROUP_AUTHORIZATION_FAILED)); + assertThrows(GroupAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE))); + + // Change subscription to include only the authorized topic. Complete rebalance and check that + // references to topic2 have been removed from SubscriptionState. + subscriptions.subscribe(Utils.mkSet(topic1), rebalanceListener); + + Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); + partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p))); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); + coordinator.poll(time.timer(Long.MAX_VALUE)); + + assertEquals(singleton(topic1), coordinator.subscriptions().subscription()); + assertEquals(singleton(topic1), coordinator.subscriptions().groupSubscription()); + } + @Test public void testWakeupFromAssignmentCallback() { final String topic = "topic1"; From 4384fbce029ab8431d82263bfb1c6079e69e6a81 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Tue, 14 Jan 2020 16:51:41 +0000 Subject: [PATCH 2/6] Address review comment --- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 4 ---- .../clients/consumer/internals/ConsumerCoordinatorTest.java | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 8d4db0e903561..0bd8eb051aa48 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1397,8 +1397,4 @@ public void invoke() { RebalanceProtocol getProtocol() { return protocol; } - - SubscriptionState subscriptions() { - return subscriptions; - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 779ce271c3cc5..a377bc405a695 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1263,8 +1263,8 @@ public void testSubscriptionChangeWithAuthorizationFailure() { client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); - assertEquals(singleton(topic1), coordinator.subscriptions().subscription()); - assertEquals(singleton(topic1), coordinator.subscriptions().groupSubscription()); + assertEquals(singleton(topic1), subscriptions.subscription()); + assertEquals(singleton(topic1), subscriptions.groupSubscription()); } @Test From 797aa7c527de1ea635fa045aec4ea53693e38792 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 22 Jan 2020 12:37:19 +0000 Subject: [PATCH 3/6] Address review comment --- checkstyle/suppressions.xml | 2 +- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 38eeab014d997..0ffe04d55fad7 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -62,7 +62,7 @@ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator).java"/> + files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 0bd8eb051aa48..947c8b57d41ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -690,7 +690,6 @@ protected void onJoinPrepare(int generation, String memberId) { } isLeader = false; - subscriptions.resetGroupSubscription(); if (exception != null) { throw new KafkaException("User rebalance callback throws an error", exception); From 6f574eaaa40d40b530808a50ced0f328012ade99 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 22 Jan 2020 14:44:27 +0000 Subject: [PATCH 4/6] Don't include subscription topics in `groupSubscription` --- .../internals/ConsumerCoordinator.java | 2 +- .../consumer/internals/ConsumerMetadata.java | 4 ++-- .../consumer/internals/SubscriptionState.java | 24 ++++++++----------- .../internals/ConsumerCoordinatorTest.java | 13 ++++++---- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 947c8b57d41ee..353cb190a0e8b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1357,7 +1357,7 @@ private static class MetadataSnapshot { private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) { Map partitionsPerTopic = new HashMap<>(); - for (String topic : subscription.groupSubscription()) { + for (String topic : subscription.metadataTopics()) { Integer numPartitions = cluster.partitionCountForTopic(topic); if (numPartitions != null) partitionsPerTopic.put(topic, numPartitions); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java index fbdf1c6c8595c..ef7d92417471b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java @@ -55,7 +55,7 @@ public synchronized MetadataRequest.Builder newMetadataRequestBuilder() { if (subscription.hasPatternSubscription()) return MetadataRequest.Builder.allTopics(); List topics = new ArrayList<>(); - topics.addAll(subscription.groupSubscription()); + topics.addAll(subscription.metadataTopics()); topics.addAll(transientTopics); return new MetadataRequest.Builder(topics, allowAutoTopicCreation); } @@ -72,7 +72,7 @@ synchronized void clearTransientTopics() { @Override protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) { - if (transientTopics.contains(topic) || subscription.isGroupSubscribed(topic)) + if (transientTopics.contains(topic) || subscription.needsMetadata(topic)) return true; if (isInternal && !includeInternalTopics) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 92b997048e5f2..7ed8614d2195f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -183,12 +183,6 @@ private boolean changeSubscription(Set topicsToSubscribe) { return false; subscription = topicsToSubscribe; - if (subscriptionType != SubscriptionType.USER_ASSIGNED) { - groupSubscription = new HashSet<>(groupSubscription); - groupSubscription.addAll(topicsToSubscribe); - } else { - groupSubscription = new HashSet<>(topicsToSubscribe); - } return true; } @@ -208,7 +202,7 @@ synchronized boolean groupSubscribe(Collection topics) { * Reset the group's subscription to only contain topics subscribed by this consumer. */ synchronized void resetGroupSubscription() { - groupSubscription = subscription; + groupSubscription = Collections.emptySet(); } /** @@ -335,9 +329,9 @@ public synchronized Set pausedPartitions() { } /** - * Get the subscription for the group. For the leader, this will include the union of the - * subscriptions of all group members. For followers, it is just that member's subscription. - * This is used when querying topic metadata to detect the metadata changes which would + * Get the subcription topics for which metadata is required . For the leader, this will include + * the union of the subscriptions of all group members. For followers, it is just that member's + * subscription. This is used when querying topic metadata to detect the metadata changes which would * require rebalancing. The leader fetches metadata for all topics in the group so that it * can do the partition assignment (which requires at least partition counts for all topics * to be assigned). @@ -345,12 +339,14 @@ public synchronized Set pausedPartitions() { * @return The union of all subscribed topics in the group if this member is the leader * of the current generation; otherwise it returns the same set as {@link #subscription()} */ - synchronized Set groupSubscription() { - return this.groupSubscription; + synchronized Set metadataTopics() { + Set topics = new HashSet<>(groupSubscription); + topics.addAll(subscription); + return topics; } - synchronized boolean isGroupSubscribed(String topic) { - return groupSubscription.contains(topic); + synchronized boolean needsMetadata(String topic) { + return subscription.contains(topic) || groupSubscription.contains(topic); } private TopicPartitionState assignedState(TopicPartition tp) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index a377bc405a695..b0b6fb80e50c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -580,7 +580,7 @@ public void testNormalJoinGroupLeader() { assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); - assertEquals(subscription, subscriptions.groupSubscription()); + assertEquals(subscription, subscriptions.metadataTopics()); assertEquals(0, rebalanceListener.revokedCount); assertNull(rebalanceListener.revoked); assertEquals(1, rebalanceListener.assignedCount); @@ -639,7 +639,7 @@ public void testOutdatedCoordinatorAssignment() { assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(newAssignment), subscriptions.assignedPartitions()); - assertEquals(toSet(newSubscription), subscriptions.groupSubscription()); + assertEquals(toSet(newSubscription), subscriptions.metadataTopics()); assertEquals(protocol == EAGER ? 1 : 0, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); assertEquals(assigned, rebalanceListener.assigned); @@ -678,7 +678,7 @@ public void testPatternJoinGroupLeader() { assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(2, subscriptions.numAssignedPartitions()); - assertEquals(2, subscriptions.groupSubscription().size()); + assertEquals(2, subscriptions.metadataTopics().size()); assertEquals(2, subscriptions.subscription().size()); // callback not triggered at all since there's nothing to be revoked assertEquals(0, rebalanceListener.revokedCount); @@ -909,7 +909,7 @@ public void testNormalJoinGroupFollower() { assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); - assertEquals(subscription, subscriptions.groupSubscription()); + assertEquals(subscription, subscriptions.metadataTopics()); assertEquals(0, rebalanceListener.revokedCount); assertNull(rebalanceListener.revoked); assertEquals(1, rebalanceListener.assignedCount); @@ -1256,6 +1256,9 @@ public void testSubscriptionChangeWithAuthorizationFailure() { // Change subscription to include only the authorized topic. Complete rebalance and check that // references to topic2 have been removed from SubscriptionState. subscriptions.subscribe(Utils.mkSet(topic1), rebalanceListener); + assertEquals(Collections.singleton(topic1), subscriptions.metadataTopics()); + client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, + Collections.emptyMap(), singletonMap(topic1, 1))); Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p))); @@ -1264,7 +1267,7 @@ public void testSubscriptionChangeWithAuthorizationFailure() { coordinator.poll(time.timer(Long.MAX_VALUE)); assertEquals(singleton(topic1), subscriptions.subscription()); - assertEquals(singleton(topic1), subscriptions.groupSubscription()); + assertEquals(singleton(topic1), subscriptions.metadataTopics()); } @Test From 57765e11ecfa34b51048e1f1c361ecf1ab63dbd2 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 22 Jan 2020 20:28:39 +0000 Subject: [PATCH 5/6] Address review comment --- .../kafka/clients/consumer/internals/SubscriptionState.java | 4 +--- .../clients/consumer/internals/ConsumerMetadataTest.java | 5 ++++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 7ed8614d2195f..ec7c376ffc7c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -340,9 +340,7 @@ public synchronized Set pausedPartitions() { * of the current generation; otherwise it returns the same set as {@link #subscription()} */ synchronized Set metadataTopics() { - Set topics = new HashSet<>(groupSubscription); - topics.addAll(subscription); - return topics; + return groupSubscription.isEmpty() ? subscription : groupSubscription; } synchronized boolean needsMetadata(String topic) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java index 33d102d3b748f..b373192ca7066 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java @@ -102,8 +102,11 @@ public void testUserAssignment() { @Test public void testNormalSubscription() { subscription.subscribe(Utils.mkSet("foo", "bar", "__consumer_offsets"), new NoOpConsumerRebalanceListener()); - subscription.groupSubscribe(Utils.mkSet("baz")); + subscription.groupSubscribe(Utils.mkSet("baz", "foo", "bar", "__consumer_offsets")); testBasicSubscription(Utils.mkSet("foo", "bar", "baz"), Utils.mkSet("__consumer_offsets")); + + subscription.resetGroupSubscription(); + testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets")); } @Test From 18a4917904d5aa779ec590d682afeb5d746f03a0 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 23 Jan 2020 16:35:22 +0000 Subject: [PATCH 6/6] Address review comment --- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 353cb190a0e8b..6610bfb025b46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -211,7 +211,6 @@ public String protocolType() { protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() { log.debug("Joining group with current subscription: {}", subscriptions.subscription()); this.joinedSubscription = subscriptions.subscription(); - subscriptions.resetGroupSubscription(); JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(); List topics = new ArrayList<>(joinedSubscription); @@ -690,6 +689,7 @@ protected void onJoinPrepare(int generation, String memberId) { } isLeader = false; + subscriptions.resetGroupSubscription(); if (exception != null) { throw new KafkaException("User rebalance callback throws an error", exception);