Skip to content

KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment#13965

Merged
ableegoldman merged 8 commits intoapache:trunkfrom
flashmouse:fix-cooperative-uniform-fail
May 15, 2024
Merged

KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment#13965
ableegoldman merged 8 commits intoapache:trunkfrom
flashmouse:fix-cooperative-uniform-fail

Conversation

@flashmouse
Copy link
Copy Markdown
Contributor

@flashmouse flashmouse commented Jul 6, 2023

This PR fixes two kinds of bugs in the new(ish) rack-aware part of the sticky assignment algorithm:

First, when reassigning "owned partitions" to their previous owners, we now have to take the rack placement into account and might not immediately assign a previously-owned partition to its old consumer during this phase. There is a small chance this partition will be assigned to its previous owner during a later stage of the assignment, but if it's not then by definition it has been "revoked" and must be removed from the assignment during the adjustment phase of the CooperativeStickyAssignor according to the cooperative protocol. We need to make sure any partitions removed in this way end up in the "partitionsTransferringOwnership".

Second, the sticky algorithm works in part by keeping track of how many consumers are still "unfilled" when they are at the "minQuota", meaning we may need to assign one more partition to get to the expected number of consumers at the "maxQuota". During the rack-aware round-robin assignment phase, we were not properly clearing the set of unfilled & minQuota consumers once we reached the expected number of "maxQuota" consumers (since by definition that means no more minQuota consumers need to or can be given any more partitions since that would bump them up to maxQuota and exceed the expected count). This bug would result in the entire assignment being failed due to a correctness check at the end which verifies that the "unfilled members" set is empty before returning the assignment. An IllegalStateException would be thrown, failing the rebalancing and sending the group into an endless rebalancing loop until/unless it was lucky enough to produce a new assignment that didn't hit this bug

@flashmouse flashmouse changed the title MINOR: CooperativeStickyAssignor may fail adjust assignment KAFKA-15170: CooperativeStickyAssignor may fail adjust assignment Jul 10, 2023
@flashmouse
Copy link
Copy Markdown
Contributor Author

please note another bugs in this implementation.

the function assignRackAwareRoundRobin of ConstrainedAssignmentBuilder have 2 logical error:

  1. (about line 726)when assignmentCount >= minQuota, the consumer should be added to

unfilledMembersWithExactlyMinQuotaPartitions only if

currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions

  1. (about line 739)unfilledMembersWithExactlyMinQuotaPartitions should execute clear when

assignment.get(consumer).size() + 1 == maxQuota.

this error will lead to fail execute verifyUnfilledMembers, throw exception, raise new rebalance and return the same assignment result so group may stuck forever.

@flashmouse
Copy link
Copy Markdown
Contributor Author

please note another bugs in this implementation.

the function assignRackAwareRoundRobin of ConstrainedAssignmentBuilder have 2 logical error:

  1. (about line 726)when assignmentCount >= minQuota, the consumer should be added to

unfilledMembersWithExactlyMinQuotaPartitions only if

currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions

  1. (about line 739)unfilledMembersWithExactlyMinQuotaPartitions should execute clear when

assignment.get(consumer).size() + 1 == maxQuota.

this error will lead to fail execute verifyUnfilledMembers, throw exception, raise new rebalance and return the same assignment result so group may stuck forever.

I suggest verifyUnfilledMembers shouldn't throw Exception because the error it checks only caused to assignment not full balance, but is still a valid result, we cannot allow rebalance stuck forever in such situation.

@flashmouse
Copy link
Copy Markdown
Contributor Author

@rreddy-22 I see you are familiar with this implementation, plz help me check this, thx!

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @rajinisivaram @ableegoldman to take a look since you have the most context about sticky assignor with rack info.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Nov 6, 2023

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Nov 6, 2023
@ijuma ijuma removed the stale Stale PRs label Feb 23, 2024
@ijuma ijuma requested a review from rajinisivaram April 4, 2024 02:58
@ableegoldman
Copy link
Copy Markdown
Member

@flashmouse can you rebase this PR please?

@flashmouse
Copy link
Copy Markdown
Contributor Author

updated to latest commit now @ableegoldman

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left a few suggestions to improve readability and hopefully prevent any more bugs like this from slipping in in the future.

Nice find by the way! Thanks for the fix

if (assignmentCount >= minQuota) {
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
if (assignmentCount < maxQuota)
if (assignmentCount < maxQuota && (currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix makes sense, good find. But let's add a comment because obviously this is difficult to understand if the bug slipped in.

Something like:

// Only add this consumer if the current num members at maxQuota is less than the expected number
// since a consumer at minQuota can only be considered unfilled if it's possible to add another partition,
// which would bump it to maxQuota and exceed the expectedNumMembersWithOverMinQuotaPartitions


private final Set<TopicPartition> partitionsWithMultiplePreviousOwners;
private final Set<TopicPartition> allRevokedPartitions;
private final Map<TopicPartition, String> mayRevokedPartitions;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this name is a bit confusing, how about maybeRevokedPartitions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified, plz check again.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybeRevokedPartitions sounds very comprehensive and short. It is indeed a nice choice of words.

More options for who do not prefer 'maybe' may be: possiblyRevokedPartitions, perhapsRevokedPartitions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More options for who do not prefer 'maybe' may be: possiblyRevokedPartitions, perhapsRevokedPartitions.

I actually considered both of those as well -- ultimately went with "maybe" since the variables in this class are all way too long already, but I'm happy with any of these choices 🙂

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more small thing but after that I'm happy to merge it

// Keep track of the partitions being migrated from one consumer to another during assignment
// so the cooperative assignor can adjust the assignment
protected Map<TopicPartition, String> partitionsTransferringOwnership = new HashMap<>();
public Map<TopicPartition, String> partitionsTransferringOwnership = new HashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making this public can you actually make it private and just add a getter method that returns it? We don't want any class fields being modifiable from the outside

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that make sense, but set to private would result in many modifications in org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest, I think you mean set protected is enough? changed to protected now.

return partitionMovements.isSticky();
}

public Map<TopicPartition, String> getPartitionsTransferringOwnership() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to mention, it's not a big deal but typically we don't include the "get" in getter names in the Kafka clients. Just a naming convention. ie this would be just partitionsTransferringOwnership()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, plz check again

@ableegoldman
Copy link
Copy Markdown
Member

Test failures are unrelated, merging to trunk

@ableegoldman ableegoldman merged commit f0291ac into apache:trunk May 15, 2024
ableegoldman pushed a commit that referenced this pull request May 15, 2024
)

This PR fixes two kinds of bugs in the new(ish) rack-aware part of the sticky assignment algorithm:

First, when reassigning "owned partitions" to their previous owners, we now have to take the rack placement into account and might not immediately assign a previously-owned partition to its old consumer during this phase. There is a small chance this partition will be assigned to its previous owner during a later stage of the assignment, but if it's not then by definition it has been "revoked" and must be removed from the assignment during the adjustment phase of the CooperativeStickyAssignor according to the cooperative protocol. We need to make sure any partitions removed in this way end up in the "partitionsTransferringOwnership".

Second, the sticky algorithm works in part by keeping track of how many consumers are still "unfilled" when they are at the "minQuota", meaning we may need to assign one more partition to get to the expected number of consumers at the "maxQuota". During the rack-aware round-robin assignment phase, we were not properly clearing the set of unfilled & minQuota consumers once we reached the expected number of "maxQuota" consumers (since by definition that means no more minQuota consumers need to or can be given any more partitions since that would bump them up to maxQuota and exceed the expected count). This bug would result in the entire assignment being failed due to a correctness check at the end which verifies that the "unfilled members" set is empty before returning the assignment. An IllegalStateException would be thrown, failing the rebalancing and sending the group into an endless rebalancing loop until/unless it was lucky enough to produce a new assignment that didn't hit this bug

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
@ableegoldman
Copy link
Copy Markdown
Member

Merged to trunk and cherrypicked back to 3.7

TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
…che#13965)

This PR fixes two kinds of bugs in the new(ish) rack-aware part of the sticky assignment algorithm:

First, when reassigning "owned partitions" to their previous owners, we now have to take the rack placement into account and might not immediately assign a previously-owned partition to its old consumer during this phase. There is a small chance this partition will be assigned to its previous owner during a later stage of the assignment, but if it's not then by definition it has been "revoked" and must be removed from the assignment during the adjustment phase of the CooperativeStickyAssignor according to the cooperative protocol. We need to make sure any partitions removed in this way end up in the "partitionsTransferringOwnership".

Second, the sticky algorithm works in part by keeping track of how many consumers are still "unfilled" when they are at the "minQuota", meaning we may need to assign one more partition to get to the expected number of consumers at the "maxQuota". During the rack-aware round-robin assignment phase, we were not properly clearing the set of unfilled & minQuota consumers once we reached the expected number of "maxQuota" consumers (since by definition that means no more minQuota consumers need to or can be given any more partitions since that would bump them up to maxQuota and exceed the expected count). This bug would result in the entire assignment being failed due to a correctness check at the end which verifies that the "unfilled members" set is empty before returning the assignment. An IllegalStateException would be thrown, failing the rebalancing and sending the group into an endless rebalancing loop until/unless it was lucky enough to produce a new assignment that didn't hit this bug

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
…che#13965)

This PR fixes two kinds of bugs in the new(ish) rack-aware part of the sticky assignment algorithm:

First, when reassigning "owned partitions" to their previous owners, we now have to take the rack placement into account and might not immediately assign a previously-owned partition to its old consumer during this phase. There is a small chance this partition will be assigned to its previous owner during a later stage of the assignment, but if it's not then by definition it has been "revoked" and must be removed from the assignment during the adjustment phase of the CooperativeStickyAssignor according to the cooperative protocol. We need to make sure any partitions removed in this way end up in the "partitionsTransferringOwnership".

Second, the sticky algorithm works in part by keeping track of how many consumers are still "unfilled" when they are at the "minQuota", meaning we may need to assign one more partition to get to the expected number of consumers at the "maxQuota". During the rack-aware round-robin assignment phase, we were not properly clearing the set of unfilled & minQuota consumers once we reached the expected number of "maxQuota" consumers (since by definition that means no more minQuota consumers need to or can be given any more partitions since that would bump them up to maxQuota and exceed the expected count). This bug would result in the entire assignment being failed due to a correctness check at the end which verifies that the "unfilled members" set is empty before returning the assignment. An IllegalStateException would be thrown, failing the rebalancing and sending the group into an endless rebalancing loop until/unless it was lucky enough to produce a new assignment that didn't hit this bug

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants