Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator).java"/>

<suppress checks="JavaNCSS"
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java|SenderTest.java|KafkaAdminClient.java"/>
files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>

<suppress checks="NPathComplexity"
files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,7 @@ private static class MetadataSnapshot {

private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
for (String topic : subscription.groupSubscription()) {
for (String topic : subscription.metadataTopics()) {
Integer numPartitions = cluster.partitionCountForTopic(topic);
if (numPartitions != null)
partitionsPerTopic.put(topic, numPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
if (subscription.hasPatternSubscription())
return MetadataRequest.Builder.allTopics();
List<String> topics = new ArrayList<>();
topics.addAll(subscription.groupSubscription());
topics.addAll(subscription.metadataTopics());
topics.addAll(transientTopics);
return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
}
Expand All @@ -72,7 +72,7 @@ synchronized void clearTransientTopics() {

@Override
protected synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
if (transientTopics.contains(topic) || subscription.isGroupSubscribed(topic))
if (transientTopics.contains(topic) || subscription.needsMetadata(topic))
return true;

if (isInternal && !includeInternalTopics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,6 @@ private boolean changeSubscription(Set<String> topicsToSubscribe) {
return false;

subscription = topicsToSubscribe;
if (subscriptionType != SubscriptionType.USER_ASSIGNED) {
groupSubscription = new HashSet<>(groupSubscription);
groupSubscription.addAll(topicsToSubscribe);
} else {
groupSubscription = new HashSet<>(topicsToSubscribe);
}
return true;
}

Expand All @@ -208,7 +202,7 @@ synchronized boolean groupSubscribe(Collection<String> topics) {
* Reset the group's subscription to only contain topics subscribed by this consumer.
*/
synchronized void resetGroupSubscription() {
groupSubscription = subscription;
groupSubscription = Collections.emptySet();
}

/**
Expand Down Expand Up @@ -335,22 +329,22 @@ public synchronized Set<TopicPartition> pausedPartitions() {
}

/**
* Get the subscription for the group. For the leader, this will include the union of the
* subscriptions of all group members. For followers, it is just that member's subscription.
* This is used when querying topic metadata to detect the metadata changes which would
* Get the subcription topics for which metadata is required . For the leader, this will include
* the union of the subscriptions of all group members. For followers, it is just that member's
* subscription. This is used when querying topic metadata to detect the metadata changes which would
* require rebalancing. The leader fetches metadata for all topics in the group so that it
* can do the partition assignment (which requires at least partition counts for all topics
* to be assigned).
*
* @return The union of all subscribed topics in the group if this member is the leader
* of the current generation; otherwise it returns the same set as {@link #subscription()}
*/
synchronized Set<String> groupSubscription() {
return this.groupSubscription;
synchronized Set<String> metadataTopics() {
return groupSubscription.isEmpty() ? subscription : groupSubscription;
}

synchronized boolean isGroupSubscribed(String topic) {
return groupSubscription.contains(topic);
synchronized boolean needsMetadata(String topic) {
return subscription.contains(topic) || groupSubscription.contains(topic);
}

private TopicPartitionState assignedState(TopicPartition tp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public void testNormalJoinGroupLeader() {

assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(toSet(assigned), subscriptions.assignedPartitions());
assertEquals(subscription, subscriptions.groupSubscription());
assertEquals(subscription, subscriptions.metadataTopics());
assertEquals(0, rebalanceListener.revokedCount);
assertNull(rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
Expand Down Expand Up @@ -639,7 +639,7 @@ public void testOutdatedCoordinatorAssignment() {

assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
assertEquals(toSet(newSubscription), subscriptions.groupSubscription());
assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
assertEquals(protocol == EAGER ? 1 : 0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertEquals(assigned, rebalanceListener.assigned);
Expand Down Expand Up @@ -678,7 +678,7 @@ public void testPatternJoinGroupLeader() {

assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(2, subscriptions.numAssignedPartitions());
assertEquals(2, subscriptions.groupSubscription().size());
assertEquals(2, subscriptions.metadataTopics().size());
assertEquals(2, subscriptions.subscription().size());
// callback not triggered at all since there's nothing to be revoked
assertEquals(0, rebalanceListener.revokedCount);
Expand Down Expand Up @@ -909,7 +909,7 @@ public void testNormalJoinGroupFollower() {

assertFalse(coordinator.rejoinNeededOrPending());
assertEquals(toSet(assigned), subscriptions.assignedPartitions());
assertEquals(subscription, subscriptions.groupSubscription());
assertEquals(subscription, subscriptions.metadataTopics());
assertEquals(0, rebalanceListener.revokedCount);
assertNull(rebalanceListener.revoked);
assertEquals(1, rebalanceListener.assignedCount);
Expand Down Expand Up @@ -1231,6 +1231,45 @@ public void testUpdateMetadataDuringRebalance() {
assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
}

/**
* Verifies that subscription change updates SubscriptionState correctly even after JoinGroup failures
* that don't re-invoke onJoinPrepare.
*/
@Test
public void testSubscriptionChangeWithAuthorizationFailure() {
final String consumerId = "consumer";

// Subscribe to two topics of which only one is authorized and verify that metadata failure is propagated.
subscriptions.subscribe(Utils.mkSet(topic1, topic2), rebalanceListener);
client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
Collections.singletonMap(topic2, Errors.TOPIC_AUTHORIZATION_FAILED), singletonMap(topic1, 1)));
assertThrows(TopicAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));

client.respond(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

// Fail the first JoinGroup request
client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.emptyMap(),
Errors.GROUP_AUTHORIZATION_FAILED));
assertThrows(GroupAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));

// Change subscription to include only the authorized topic. Complete rebalance and check that
// references to topic2 have been removed from SubscriptionState.
subscriptions.subscribe(Utils.mkSet(topic1), rebalanceListener);
assertEquals(Collections.singleton(topic1), subscriptions.metadataTopics());
client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
Collections.emptyMap(), singletonMap(topic1, 1)));

Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p)));
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));

assertEquals(singleton(topic1), subscriptions.subscription());
assertEquals(singleton(topic1), subscriptions.metadataTopics());
}

@Test
public void testWakeupFromAssignmentCallback() {
final String topic = "topic1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ public void testUserAssignment() {
@Test
public void testNormalSubscription() {
subscription.subscribe(Utils.mkSet("foo", "bar", "__consumer_offsets"), new NoOpConsumerRebalanceListener());
subscription.groupSubscribe(Utils.mkSet("baz"));
subscription.groupSubscribe(Utils.mkSet("baz", "foo", "bar", "__consumer_offsets"));
testBasicSubscription(Utils.mkSet("foo", "bar", "baz"), Utils.mkSet("__consumer_offsets"));

subscription.resetGroupSubscription();
testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets"));
}

@Test
Expand Down