KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState#7941
Conversation
|
retest this please |
1 similar comment
|
retest this please |
There was a problem hiding this comment.
I'm confused about a couple things. First, if a subscription changes while we are in the middle of a rebalance, do we need to call onJoinPrepare again? With the old semantics, we probably didn't need to because we revoked everything, but with "cooperative" rebalancing, perhaps we should? cc @guozhangwang
The second thing is that it seems surprising that we raise auth errors from the "group subscription." If consumer 1 has subscribed to "A" and consumer 2 has subscribed to "B," I probably wouldn't expect to get any auth errors for topic "B" from consumer 1. So maybe we should be ignoring auth errors not part of subscription()? Not sure if that is straightforward or if there are any additional implications.
There was a problem hiding this comment.
@hachikuji Thanks for the review. If we decide that we always need to do onJoinPrepare, then we won't need this fix.
It is odd to propagate authorization failures from the group subscription since that feels more like an implementation detail. But at the same time, if we don't get metadata for some topics, is there any point in adding topics from group subscription to the ConsumerMetadata? We probably need to update javadoc for poll() if we don't change the current behaviour.
There was a problem hiding this comment.
Regarding the first question, if a subscription changes while we are in the middle of a rebalance we would call onPartitionsRevoked immediately on those still-assigned partitions that are not part of the new subscription any more, and for the returned assignment we check if they match our join-subscription and if not we trigger another rebalance (this is pre-429). So I think we still do not need to re-trigger onJoinPrepare when subscription changes in the middle.
There was a problem hiding this comment.
@rajinisivaram I think I understand the problem a little better now and I'm coming around to the approach in this PR. It does make sense to me to reset the group subscription at the time that we set joinedSubscription in ConsumerCoordinator. Then I guess my only question is whether we still need to reset it inside onJoinPrepare as well?
Another (possibly worse) idea I had is to change the implementation of resetGroupSubscription. Perhaps instead of setting it equal to the current local subscription, we should set it to null or empty. We could try to maintain a cleaner separation between the local and group subscriptions. We would then need to update ConsumerMetadata to work with both the local and group subscription.
There was a problem hiding this comment.
@hachikuji Thanks for the reviews. I pushed two commits. The first one just removes reset from onJoinPrepare since we don't need it any more. I also wanted to try out your suggestion of maintaining clean separation between local and group subscriptions. The last commit does this. Do you think it is worth making that change?
…n if onJoinPrepare not invoked after subscribe
d142bb7 to
b2db7c5
Compare
b2db7c5 to
6f574ea
Compare
There was a problem hiding this comment.
Thanks @rajinisivaram. I think it probably is a bit cleaner to keep the group subscription separate. I left a minor suggestion.
| synchronized Set<String> groupSubscription() { | ||
| return this.groupSubscription; | ||
| synchronized Set<String> metadataTopics() { | ||
| Set<String> topics = new HashSet<>(groupSubscription); |
There was a problem hiding this comment.
It might borderline overkill, but I'm considering if we could avoid the copy here with logic like the following:
if (!groupSubscription.isEmpty())
return groupSubscription;
else
return subscription;
Basically relying on the the group subscription being a superset of the local subscription when it is defined.
There was a problem hiding this comment.
@hachikuji Thanks for the review, updated. I had to update one unit test since it wasn't using a superset for group subscription, but think that is ok.
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Left a minor comment, but I'll leave it up to you.
| } | ||
|
|
||
| isLeader = false; | ||
| subscriptions.resetGroupSubscription(); |
There was a problem hiding this comment.
I guess we could restore this location with the change to the group subscription maintenance? I don't feel too strongly since the new location works also.
There was a problem hiding this comment.
@hachikuji Thanks for the review. Restored this so that the only change from this PR is the clean separation of group and local subscriptions
|
retest this please |
|
@hachikuji Thanks for the reviews, merging to trunk |
Conflicts and/or compiler errors due to the fact that we temporarily reverted the commit that removes Scala 2.11 support: * Exit.scala: replace SAMs with anonymous inner classes. * MiniKdc.scala: take upstream changes. # By A. Sophie Blee-Goldman (1) and others # Via Jason Gustafson * apache-github/trunk: KAFKA-9254; Overridden topic configs are reset after dynamic default change (apache#7870) MINOR: MiniKdc JVM shutdown hook fix (apache#7946) KAFKA-9152; Improve Sensor Retrieval (apache#7928) Correct exception message in DistributedHerder (apache#7995) KAFKA-7317: Use collections subscription for main consumer to reduce metadata (apache#7969) KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (apache#7941) KAFKA-7737; Use single path in producer for initializing the producerId (apache#7920) # Conflicts: # core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
…ptions in consumer's SubscriptionState (apache#7941) Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com> (cherry picked from commit a565d1a)
The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to #7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic. Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to #7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic. Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
…#8095) The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to apache#7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic. Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, apache#6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
…ptions in consumer's SubscriptionState (#7941) Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to #7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic. Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to #7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic. Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
…#8095) The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to apache#7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic. Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, apache#6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
…ptions in consumer's SubscriptionState (apache#7941) Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…#8095) The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to apache#7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic. Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, apache#6561 is probably still needed to improve the resilience of this test. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
As described in KAFKA-9181, kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe occasionally hits unexpected TopicAuthorizationException even after the topic is removed from the subscription. The test uses small metadata refresh time and hence can see metadata responses before JoinGroup is processed. We currently rely on
onJoinPrepareto reset SubscriptionState.groupSubscription, which accumulates topics until reset. If we process JoinGroup after a subscribe without a newonJoinPrepare, we leave the topic inSubscriptionState.groupSubscriptionand hence in metadata. This PR resetsgroupSubscriptionwhen sending JoinGroup request, whenConsumerCoordinator.joinedSubscriptionis updated.Committer Checklist (excluded from commit message)