KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads#11493
Conversation
|
This contribution is my original work and I license the work to the project under the Apache version 2 licence |
7f11848 to
2acb7d0
Compare
|
Hey @ableegoldman I see you've got a few commits on this file, Would you be able to review or point us in the direction of one of your co-committers who might be able to help us here. |
|
@tim-patterson , thanks for the PR. I'll review the PR next week. Thanks for the improvement. |
showuon
left a comment
There was a problem hiding this comment.
@tim-patterson , thanks for the patch! The algorithm changes make sense to me. Left some comments. Thanks.
| // Update threadLoad | ||
| for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) { | ||
| final String consumer = taskEntry.getKey(); | ||
| final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size(); | ||
| threadLoad.put(consumer, totalCount); | ||
| } | ||
|
|
There was a problem hiding this comment.
should we keep the threadLoad at the end of assignStatelessTasksToThreads. We don't need it after assignStatelessTasksToThreads, right?
| final Set<TaskId> prevTasks00 = mkSet(TASK_0_0); | ||
| final Set<TaskId> standbyTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); | ||
|
|
||
| createMockTaskManager(prevTasks00, standbyTasks); |
There was a problem hiding this comment.
The prevTasks00 and standbyTasks are meaningless in this test. If we just want to create a mock taskManager, could we use createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); instead?
| threadTaskCounts | ||
| ); | ||
|
|
||
| final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment; |
There was a problem hiding this comment.
could we add comment here to mention this is to combine active stateful assignment + active stateless assignmet...?
| if (unassignedStatefulTasks.contains(task)) { | ||
| if (threadAssignment.size() < minStatefulTasksPerThread) { | ||
| if (unassignedTasks.contains(task)) { | ||
| final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); |
There was a problem hiding this comment.
threadLoad is not changed before the end of the assignTasksToThreads method. Could we get the value: threadLoad.getOrDefault(consumer, 0); at the beginning of the consumers loop?
There was a problem hiding this comment.
Also, to compute the threadTaskCount, we always need to add threadLoad.getOrDefault(consumer, 0) each time. Could we just make the minTasksPerThread = minTasksPerThread - threadLoad.getOrDefault(consumer, 0), and add comments to it to make it clear, so that we don't need to do this adding threadLoad.getOrDefault(consumer, 0) each time? WDYT?
| // stateful tasks still remaining that should now be distributed over the consumers | ||
| if (!unassignedStatefulTasks.isEmpty()) { | ||
| consumersToFill.addAll(consumers); | ||
| // At this point all consumers are at the min or min + 1 capacity, |
There was a problem hiding this comment.
Where does the min + 1 case come from? Could you elaborate more?
There was a problem hiding this comment.
It took me a while to figure out again.
Imagine the case where there's 4 threads,
5 active tasks and 2 standby tasks.
After assigning the actives, 1 of the threads already has 2 tasks.
So when in comes to assigning the standbys the minTasksPerThread is calculated again at 1, so before we've even assigned any standbys we already have some threads at min+1.
Thanks for raising this I'll make a comment :)
| if (taskCount == minTasksPerThread) { | ||
| consumersToFill.add(consumer); |
There was a problem hiding this comment.
I think the original algorithm assumes that when reaching this step, all consumers are at the min capacity. So that it could just do this to set comsuersToFill:
if (!unassignedStatefulTasks.isEmpty())
consumersToFill.addAll(consumers);But I didn't see the where we change this algorithm. Please let me know where I missed. Thanks.
There was a problem hiding this comment.
As commented above
| int maxThreadLoad = 0; | ||
| for (final int load : threadLoad.values()) { | ||
| maxThreadLoad = Integer.max(maxThreadLoad, load); |
There was a problem hiding this comment.
I was thinking we don't need these loops to get the maxThreadLoad and consumersToFill. Do you think if we can put assignStatelessTasksToThreads at the end of assignStatefulTasksToThreads. So that we will have maxThreadLoad and consumersToFill directly. So in the assignStatefulTasksToThreads method signature, we will have one more parameter like: boolean shouldAssignStatelessTasks, or isActiveTasksAssignment... something like that. WDYT?
There was a problem hiding this comment.
Yeah we can certainly try something like that.
Instead of a boolean the method could take statefulTasks and statelessTasks, for the first call we could call it with statelessTasks = Collection.emptySet().
I'll get the other stuff fixed first and then push it up as a separate commit
There was a problem hiding this comment.
Let me know what you think of the latest commit.
Instead of gluing the methods together I just got rid of assignStatelessTasksToThreads, the only real difference between the two methods was the stateful one trying to be sticky.
I just passed in a boolean to control the part of the method that does the stickiness stuff.
| assertEquals(1, info10.activeTasks().size() + info10.standbyTasks().size()); | ||
| assertEquals(1, info11.activeTasks().size() + info11.standbyTasks().size()); | ||
| assertEquals(1, info12.activeTasks().size() + info12.standbyTasks().size()); | ||
| assertEquals(0, info13.activeTasks().size() + info13.standbyTasks().size()); | ||
| assertEquals(1, info20.activeTasks().size() + info20.standbyTasks().size()); | ||
| assertEquals(1, info21.activeTasks().size() + info21.standbyTasks().size()); | ||
| assertEquals(1, info22.activeTasks().size() + info22.standbyTasks().size()); |
There was a problem hiding this comment.
I'm thinking we can just verify the assignment size is < 2. We don't care about the exact number, right?
| } | ||
|
|
||
| @Test | ||
| public void testAssignWithStandbyReplicasBalance() { |
There was a problem hiding this comment.
Could we add some tests to cover the are less than total tasks (active + standby), but we can still distribute them evenly (the test case should be more common for users), ex:
tasks: TASK_0_0, TASK_0_1, TASK_0_2
consumers: 2 in thread1, 3 in thread2
We expected each thread should have 3 tasks, and distribute evenly.
There was a problem hiding this comment.
Also, some tests to make sure the statelessTasks are also taking the thread load into account. Thanks.
|
Thanks for the review @showuon. |
showuon
left a comment
There was a problem hiding this comment.
Thanks for the update. Left some comments. Waiting for your next update. Thanks.
| if (threadAssignment.size() < minStatefulTasksPerThread) { | ||
| if (unassignedTasks.contains(task)) { | ||
| final int threadTaskCount = threadAssignment.size(); | ||
| if (threadTaskCount < tasksTargetCount) { |
There was a problem hiding this comment.
nit: we can use threadAssignment.size() to replace the threadTaskCount variable. Same as below.
| // At this point all consumers are at the min or min + 1 capacity. | ||
| // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning | ||
| // the active tasks some consumers already have min + 1 one tasks assigned. | ||
| // The tasks still remaining should now be distributed over the consumers that are still at min capacity |
There was a problem hiding this comment.
Thanks for your explanation. I've got it now, because we've change the total tasks count from original active tasks into total ones including active tasks. Thanks.
044b174 to
5f186a3
Compare
showuon
left a comment
There was a problem hiding this comment.
@tim-patterson , thanks for the update. The stateless tasks update LGTM. It is indeed able to re-use the same logic as the stateful tasks, just no prev-owner sticky assignments. Left some comments. Thanks.
| // Check each consumer has 3 tasks | ||
| assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size()); | ||
| assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size()); | ||
| // Check that not all the actives are on one node | ||
| assertTrue(info10.activeTasks().size() < 3); | ||
| assertTrue(info20.activeTasks().size() < 3); |
| getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); | ||
|
|
||
| final Map<String, Assignment> assignments = | ||
| partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); |
There was a problem hiding this comment.
I ran it and found there are no stateless tasks included in the assignment. Could you check again?
| assignment.get(consumer).add(task); | ||
| consumersToFill.offer(consumer); | ||
| // Update threadLoad | ||
| for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) { |
There was a problem hiding this comment.
Could we skip this threadLoad update for stateless tasks?
There was a problem hiding this comment.
My only worry with that is that to get a proper balance it would rely on the caller to always assign stateless tasks last which might not be clear to the caller.
Happy to stick an if statement around it if you think it's worth it though.
There was a problem hiding this comment.
Make sense to me. We can keep it as is. Thanks.
| int totalTasks = tasksToAssign.size(); | ||
| for (final Integer threadTaskCount : threadLoad.values()) { | ||
| totalTasks += threadTaskCount; | ||
| } |
There was a problem hiding this comment.
nit: we can use stream here:
int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum);
| * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating | ||
| * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by | ||
| * group id to spread subtopologies across threads and further balance the workload. | ||
| * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys | ||
| * are evenly distributed |
There was a problem hiding this comment.
Should we mention here that for stateless tasks, we just distribute them evenly to all consumers?
|
@cadonna , do you want to have a second pairs of eyes to look at the PR? Thanks. |
showuon
left a comment
There was a problem hiding this comment.
LGTM! Thanks for the improvement! I'll wait until next week to see if @cadonna or @ableegoldman have time to have an another review. Thank you.
|
@tim-patterson , could we rename the PR title? It seems that we don't just assign standby tasks to |
|
Failed tests are flaky tests that also failed in trunk branch: |
|
@tim-patterson , thanks for the contribution! |
Kafka Streams - Currently while distributing the standby tasks streams does not check if there are threads without any tasks or with less number of tasks. This can lead to few threads getting assigned both active and standby tasks when are threads within the same instance without any tasks assigned.
This PR takes into account active task assignment when assigning standbys to threads to achieve a better balance of tasks across threads
Committer Checklist (excluded from commit message)