Skip to content

KAFKA-7965 (part-1): Fix one case which makes ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup flaky#8437

Closed
dajac wants to merge 1 commit intoapache:trunkfrom
dajac:KAFKA-7965-part-1
Closed

Conversation

@dajac
Copy link
Copy Markdown
Member

@dajac dajac commented Apr 7, 2020

I have been investigating ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup during the last week. I have identified two cases that makes it fail from times to times, especially under high resource constraints. This PR explains and propose a fix for the first case.

In a nutshell, two consumers are kicked out of the group because of the preferred leader election:

  • When the group is loaded by Group Coordinator 0 which has the limit in place, it triggers a rebalance to kick out a consumer from the group.
  • A first member is kicked out while it re-joins the group.
  • Before the rebalance completes, the controller moves the leader from 0 to 1. Therefore, the group is loaded by Group Coordinator 1 and unloaded by Group Coordinator 0. The loaded group has still all members because the rebalance did not complete, therefore it triggers another rebalance to kick out a member from the group.
  • A second member is kicked out while it re-joins the group.

The ConsumerAssignmentPoller stop themselves when an exception is raised and they report the exception. Therefore the test fails because two consumers have been kicked out from the group where it expects only one to be kicked out.

assertEquals(1, raisedExceptions.size)
assertTrue(raisedExceptions.head.isInstanceOf[GroupMaxSizeReachedException])

To mitigate this, I propose to disable the AutoLeaderRebalanceEnableProp for all the tests in ConsumerBounceTest. It makes things unpredictable and therefore increase the ricks of flakiness.

I haven't been able to get this failure again with this fix. I have run the single test for 24+ hours in a while loop within a docker contain with limited resources to verify.

Bellow, you can find the relevant traces captured when the test failed.

// Group is loaded in GroupCoordinator 0
// A rebalance is triggered because the group is over capacity
[2020-04-02 11:14:33,393] INFO [GroupMetadataManager brokerId=0] Scheduling loading of offsets and group metadata from __consumer_offsets-0 (kafka.coordinator.group.GroupMetadataManager:66)
[2020-04-02 11:14:33,406] INFO [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] Discovered group coordinator localhost:40071 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:794)
[2020-04-02 11:14:33,409] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,410] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,412] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Loading group metadata for group-max-size-test with generation 1 (kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Preparing to rebalance group group-max-size-test in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a chance for consumers to commit offsets) (kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:33,431] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-0 in 28 milliseconds, of which 0 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager:66)

// A first consumer is kicked out of the group while trying to re-join
[2020-04-02 11:14:33,449] ERROR [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] Attempt to join group failed due to fatal error: The consumer group has reached its max size. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
[2020-04-02 11:14:33,451] ERROR [daemon-consumer-assignment-2]: Error due to (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group group-max-size-test already has the configured maximum number of members.
[2020-04-02 11:14:33,451] INFO [daemon-consumer-assignment-2]: Stopped (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66)

// Before the rebalance is completed, a preferred replica leader election kicks in and move the leader from 0 to 1
[2020-04-02 11:14:34,155] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController:66)
[2020-04-02 11:14:34,169] INFO [Controller id=0] Starting replica leader election (PREFERRED) for partitions group-max-size-test-0,group-max-size-test-3,__consumer_offsets-0 triggered by AutoTriggered (kafka.controller.KafkaController:66)

// The group is loaded in GroupCoordinator 1 before completing the rebalance
// Another rebalance is triggered because the group is still over capacity
[2020-04-02 11:14:34,194] INFO [GroupMetadataManager brokerId=1] Scheduling loading of offsets and group metadata from __consumer_offsets-0 (kafka.coordinator.group.GroupMetadataManager:66)
[2020-04-02 11:14:34,199] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:34,199] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:34,199] INFO Static member MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d, groupInstanceId=Some(null), clientId=ConsumerTestConsumer, clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test loaded with member id ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:34,201] INFO [GroupCoordinator 1]: Loading group metadata for group-max-size-test with generation 1 (kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:34,202] INFO [GroupCoordinator 1]: Preparing to rebalance group group-max-size-test in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a chance for consumers to commit offsets) (kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:34,203] INFO [GroupMetadataManager brokerId=1] Finished loading offsets and group metadata from __consumer_offsets-0 in 9 milliseconds, of which 0 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager:66)

// Prefered leader election is completed
[2020-04-02 11:14:34,235] INFO [Controller id=0] Partition __consumer_offsets-0 completed preferred replica leader election. New leader is 1 (kafka.controller.KafkaController:66)

// Group is unloaded from GroupCoordinator 0
[2020-04-02 11:14:34,237] INFO [GroupMetadataManager brokerId=0] Scheduling unloading of offsets and group metadata from __consumer_offsets-0 (kafka.coordinator.group.GroupMetadataManager:66)
[2020-04-02 11:14:34,237] INFO [GroupCoordinator 0]: Unloading group metadata for group-max-size-test with generation 1 (kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:34,238] INFO [GroupMetadataManager brokerId=0] Finished unloading __consumer_offsets-0. Removed 0 cached offsets and 1 cached groups. (kafka.coordinator.group.GroupMetadataManager:66)

// A second consumer is kicked out of the group while trying to re-join
[2020-04-02 11:14:34,252] ERROR [Consumer clientId=ConsumerTestConsumer, groupId=group-max-size-test] Attempt to join group failed due to fatal error: The consumer group has reached its max size. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
[2020-04-02 11:14:34,254] ERROR [daemon-consumer-assignment-1]: Error due to (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group group-max-size-test already has the configured maximum number of members.
[2020-04-02 11:14:34,254] INFO [daemon-consumer-assignment-1]: Stopped (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66)

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…RollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup flaky
@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Apr 7, 2020

Does the first consumer, which is kicked out, rejoin the group? The number of consumers in the group is N -1 or N - 2 after rebalance is completed?

@dajac
Copy link
Copy Markdown
Member Author

dajac commented Apr 7, 2020

Does the first consumer, which is kicked out, rejoin the group? The number of consumers in the group is N -1 or N - 2 after rebalance is completed?

@chia7712 No, it does not rejoin the group because the "pollers" used in the test suite fail fast. The number of consumers in the group after the rebalance is complete is N-2.

@hachikuji
Copy link
Copy Markdown
Contributor

@dajac Thanks, interesting investigation. I think my only concern with the patch is that it affects all of the tests in ConsumerBounceTest. Some of these test cases are verifying behavior when brokers are killed and I think the the preferred leader election ought to be covered as well.

Considering the nature of the edge case itself, would you consider it a bug? It definitely seems less than ideal that leader changes could cause some members to be kicked unnecessarily. I am wondering if we can change the eviction logic to make the process more reliable. As I understand it, the way it works currently is the following:

  1. When we load the group, we check if it is oversized. If so, we transition the group to PreparingRebalance, which alerts all members that they need to rejoin.
  2. When we get a JoinGroup, we check the current size of the group. If it is larger than needed and the member isn't awaiting the join already, then we kick it from the group by returning GROUP_MAX_SIZE_REACHED.

Combining these two, it seems that following a coordinator reload, we will always end up evicting the first members that rejoin the group. That seems surprising. Could we change that so that the last members to rejoin the group are kicked instead? If we did that, then leader changes wouldn't be a problem (I think).

@dajac
Copy link
Copy Markdown
Member Author

dajac commented Apr 9, 2020

@hachikuji I understand your concern. It is probably not a good idea to disable it for all the tests in ConsumerBounceTest.

Yeah, you're right. The behavior is less than ideal thus we could consider it as a bug. Evicting the last members that rejoin the group sounds like a good way to fix the root cause of the issue. I will give it a shot.

Thanks for your feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants