Skip to content

KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing#12019

Closed
C0urante wants to merge 5 commits intoapache:trunkfrom
C0urante:kafka-13764
Closed

KAFKA-13764: Improve balancing algorithm for Connect incremental rebalancing#12019
C0urante wants to merge 5 commits intoapache:trunkfrom
C0urante:kafka-13764

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

@C0urante C0urante commented Apr 8, 2022

Jira

Depends on #11983

The primary goal of this PR is to address several outstanding issues with incremental rebalancing that lead to stable-but-unbalanced clusters. However, other small bug fixes are also applied, and some liberty is taken with refactoring to improve readability and flexibility in the code base.

This should also address KAFKA-12495, and includes an adapted test case from #10367, which addresses that issue but with a different approach.

High-level changes:

  • Refine the logic for load-balancing revocations:
    • Perform load-balancing revocations any time the cluster appears imbalanced and there are still connectors and tasks that can be revoked from workers, instead of only when the number of workers in the cluster changes
    • Remove the "rough estimation" logic and replace it with a precise calculation of exactly how to allocate all currently-configured connectors and tasks as evenly as possible across a cluster
    • Account for load-balancing revocations when assigning new and lost connectors and tasks across the cluster
  • Improve code quality:
    • Extract the ConnectorsAndTasks class into its own file, enrich it and its builder class with developer-friendly methods, make its contents completely immutable, and use Set instead of generic Collection instances to store connectors and tasks
    • Where possible, identify logic that is shared for connectors and tasks (IncrementalCooperativeAssignor::assignConnectors and IncrementalCooperativeAssignor::assignTasks, for example) and abstract it into a single reusable method
    • Use the final keyword for base and derived sets in IncrementalCooperativeAssignor::performTaskAssignment (tracking mutations across a 100+ line method is difficult)
    • Reword unnecessary and confusing comments ("... is a derived set from the set difference of ..." is not very informative)
    • Reorganize the grouping of methods in IncrementalCooperativeAssignor to place static utility methods together at the bottom of the class
    • Demote visibility of testing-only methods and fields from protected to package-private (protected implies that the field/method is intended for use by subclasses, which is not the case for any of these)
  • Testing:
    • Add several new tests to cover a variety of new cases, many of which result in imbalanced allocation with the current rebalancing logic, but which are all correctly handled with the improvements in this PR
    • Add a few testing utility methods to help "hand wave" test cases without having to specify fine-grained expectations like how many rounds of rebalance are required to reach stability after some changes have been applied to the cluster
    • Add coverage to all tests that ensures that no connectors or tasks are both revoked and assigned from the same worker, and that the leader's view of the complete assignment of connectors and tasks across the cluster appears to be correct after each rebalance
  • Miscellaneous:
    • Demote a ton of noisy DEBUG-level log messages to TRACE

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

… around tests for detecting unexpected rebalances, introduce failing test due to lack of consecutive revocations
@C0urante
Copy link
Copy Markdown
Contributor Author

Apologies @showuon, this does not actually (fully) address KAFKA-12495. Several of the test cases here relied on rebalances being triggered in circumstances that they would not normally be triggered under, which caused the issue with not performing consecutive revocations to be masked.

I've added a new failing test case that's very similar to the one in #10367 but which fails with all functional and testing framework changes I've made so far in this pull request.

At this point I don't see too many alternatives to permitting consecutive revocations, but here are a few that come to mind when the cluster is imbalanced but a revocation took place during the last round:

  • Have the leader send out an assignment with no revocations but also a delay of 1ms to trigger an immediate follow-up rebalance for revocations
  • Have every worker automatically rejoin the cluster whenever they receive an assignment that contains revoked connectors/tasks (like now) or newly-assigned connectors/tasks, until a rebalance takes place that doesn't change any worker's assigned connectors/tasks
  • Have the leader immediately trigger a new rebalance after completing this one by rejoining the group

These (and the strategy of permitting consecutive revocations) all fall under the umbrella of https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect#KIP415:IncrementalCooperativeRebalancinginKafkaConnect-ChangestoConnect'sRebalancingProcess, which outlines this strategy:

  • When a Worker is elected as Leader, it computes a new assignment, describing both assigned and revoked connectors and tasks (previously the Leader computed an assignment from scratch without defining revoked resources).
  • When a Worker receives its assignment, if this assignment includes any revoked connectors or tasks, it stops (releases) these resources and then immediately rejoins the group with an assignment that excludes revoked resources (previously, upon receipt of assignment, the Worker started the connectors and tasks and operated in the new generation of the group protocol until the next rebalance some time in the future).
  • Normally in the next assignment round, the Leader will assign resources according to its policy and there will be no revoked resources in any of the Workers. If that's not the case, the previous steps will be repeated until the group converges into an assignment without revocations.

(Emphasis mine)

Overall I think permitting consecutive revocations is the safest and most intuitive option here, and with the new testing logic in this PR (especially the new assertNoFurtherAssignments method), we get pretty extensive coverage to help ensure we don't get trapped in an infinite revocation loop.

…/distributed/IncrementalCooperativeAssignor.java

Co-authored-by: YEONCHEOL JANG <65603611+YeonCheolGit@users.noreply.github.com>
@yeoncheol-jang
Copy link
Copy Markdown

@C0urante
Hello! I am just wondering that how this PR is going? Is threre anything that i can help with?

@C0urante
Copy link
Copy Markdown
Contributor Author

Hi @YeonCheolGit! There's always more PRs to review than there are reviewers (especially with Connect), so feel free to give this (or probably #11983, which this PR depends on) a review if you'd like to help.

}
return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion);
Map<String, ByteBuffer> result = serializeAssignments(assignments);
log.debug("Finished assignment");
Copy link
Copy Markdown

@yeoncheol-jang yeoncheol-jang Apr 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@C0urante

This works with Map<String, ExtendedAssignment> assignment's'.
So maybe this?

Suggested change
log.debug("Finished assignment");
log.debug("Finished assignments");

log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments);
// The connectors and tasks that should already be running on the cluster, but which are not included
// in the assignment reported by any workers in the cluster
final ConnectorsAndTasks lostAssignments = ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what lost assignment meaning is?
As far as i know ConnectorsAndTasks.diff returns remainder after subtracted assignments.

Comment on lines 629 to +631
while (load.hasNext()) {
int firstLoad = first.tasksSize();
int upTo = IntStream.range(0, workerAssignment.size())
.filter(i -> workerAssignment.get(i).tasksSize() > firstLoad)
int firstLoad = allocationSize.apply(first);
int upTo = IntStream.range(0, workers.size())
Copy link
Copy Markdown

@yeoncheol-jang yeoncheol-jang May 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is minor suggestion and could be ignored.
If calculate workers.size in while loop it has to be calculated all the time while it is true.
What about calculating size one time then use it many times.

final int workersSize = workers.size();
IntStream.range(0, workersSize)

* <li>The allocation of connectors and tasks across the cluster were as balanced as possible (i.e., the difference in allocation size between any two workers is at most one)</li>
* <li>Any workers that left the group within the scheduled rebalance delay permanently left the group</li>
* <li>All currently-configured connectors and tasks were allocated (including instances that may be revoked in this round because they are duplicated across workers)</li>
* </ul>
Copy link
Copy Markdown

@yeoncheol-jang yeoncheol-jang May 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is silly question and i don't know this things much.
Is there any reason to put HTML tag for comments?

}

return revoking;
private int calculateDelay(long now) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see using many final parameters in Kafka code. So this just for code convention and safety.

Suggested change
private int calculateDelay(long now) {
private int calculateDelay(final long now) {

@C0urante
Copy link
Copy Markdown
Contributor Author

Converting this to a draft since I haven't had time to prioritize it (sorry @YeonCheolGit!) and the changes here are not safe to merge as-are.

@C0urante C0urante marked this pull request as draft July 26, 2022 01:55
@yeoncheol-jang
Copy link
Copy Markdown

Converting this to a draft since I haven't had time to prioritize it (sorry @YeonCheolGit!) and the changes here are not safe to merge as-are.

No worries @C0urante! All good and thanks for letting me know this:)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants