-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-12959: Distribute standby and active tasks across threads to better balance load between threads #11493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2acb7d0
b7fc6e9
34acba5
5f186a3
1d8b25d
9f2a457
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,7 +60,6 @@ | |
| import java.util.Comparator; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
@@ -784,21 +783,39 @@ private Map<String, Assignment> computeNewAssignment(final Set<TaskId> statefulT | |
| final ClientMetadata clientMetadata = clientEntry.getValue(); | ||
| final ClientState state = clientMetadata.state; | ||
| final SortedSet<String> consumers = clientMetadata.consumers; | ||
| final Map<String, Integer> threadTaskCounts = new HashMap<>(); | ||
|
|
||
| final Map<String, List<TaskId>> activeTaskAssignment = assignTasksToThreads( | ||
| final Map<String, List<TaskId>> activeTaskStatefulAssignment = assignTasksToThreads( | ||
| state.statefulActiveTasks(), | ||
| state.statelessActiveTasks(), | ||
| true, | ||
| consumers, | ||
| state | ||
| state, | ||
| threadTaskCounts | ||
| ); | ||
|
|
||
| final Map<String, List<TaskId>> standbyTaskAssignment = assignTasksToThreads( | ||
| state.standbyTasks(), | ||
| Collections.emptySet(), | ||
| true, | ||
| consumers, | ||
| state, | ||
| threadTaskCounts | ||
| ); | ||
|
|
||
| final Map<String, List<TaskId>> activeTaskStatelessAssignment = assignTasksToThreads( | ||
| state.statelessActiveTasks(), | ||
| false, | ||
| consumers, | ||
| state | ||
| state, | ||
| threadTaskCounts | ||
| ); | ||
|
|
||
| // Combine activeTaskStatefulAssignment and activeTaskStatelessAssignment together into | ||
| // activeTaskStatelessAssignment | ||
| final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment; | ||
| for (final Map.Entry<String, List<TaskId>> threadEntry : activeTaskStatelessAssignment.entrySet()) { | ||
| activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue()); | ||
| } | ||
|
|
||
| // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance, | ||
| // note once we pick the first consumer within the process to trigger probing rebalance, other consumer | ||
| // would not set to trigger any more. | ||
|
|
@@ -1028,116 +1045,111 @@ private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final String consum | |
| } | ||
|
|
||
| /** | ||
| * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating | ||
| * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners | ||
| * will be interleaved by group id to spread subtopologies across threads and further balance the workload. | ||
| * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating | ||
| * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by | ||
| * group id to spread subtopologies across threads and further balance the workload. | ||
| * Stateless tasks are simply spread across threads without taking into account previous ownership. | ||
| * threadLoad is a map that keeps track of task load per thread across multiple calls so active and standby | ||
| * tasks are evenly distributed | ||
| */ | ||
| static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> statefulTasksToAssign, | ||
| final Collection<TaskId> statelessTasksToAssign, | ||
| static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign, | ||
| final boolean isStateful, | ||
| final SortedSet<String> consumers, | ||
| final ClientState state) { | ||
| final ClientState state, | ||
| final Map<String, Integer> threadLoad) { | ||
| final Map<String, List<TaskId>> assignment = new HashMap<>(); | ||
| for (final String consumer : consumers) { | ||
| assignment.put(consumer, new ArrayList<>()); | ||
| } | ||
|
|
||
| final List<TaskId> unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign); | ||
| Collections.sort(unassignedStatelessTasks); | ||
|
|
||
| final Iterator<TaskId> unassignedStatelessTasksIter = unassignedStatelessTasks.iterator(); | ||
| final int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum); | ||
|
|
||
| final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size()); | ||
| final PriorityQueue<TaskId> unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign); | ||
| final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size()); | ||
| final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign); | ||
|
|
||
| final Queue<String> consumersToFill = new LinkedList<>(); | ||
| // keep track of tasks that we have to skip during the first pass in case we can reassign them later | ||
| // using tree-map to make sure the iteration ordering over keys are preserved | ||
| final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>(); | ||
|
|
||
| if (!unassignedStatefulTasks.isEmpty()) { | ||
| // First assign stateful tasks to previous owner, up to the min expected tasks/thread | ||
| if (!unassignedTasks.isEmpty()) { | ||
| // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful | ||
| for (final String consumer : consumers) { | ||
| final List<TaskId> threadAssignment = assignment.get(consumer); | ||
|
|
||
| for (final TaskId task : state.prevTasksByLag(consumer)) { | ||
| if (unassignedStatefulTasks.contains(task)) { | ||
| if (threadAssignment.size() < minStatefulTasksPerThread) { | ||
| threadAssignment.add(task); | ||
| unassignedStatefulTasks.remove(task); | ||
| } else { | ||
| unassignedTaskToPreviousOwner.put(task, consumer); | ||
| // The number of tasks we have to assign here to hit minTasksPerThread | ||
| final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0); | ||
|
|
||
| if (isStateful) { | ||
| for (final TaskId task : state.prevTasksByLag(consumer)) { | ||
| if (unassignedTasks.contains(task)) { | ||
| if (threadAssignment.size() < tasksTargetCount) { | ||
| threadAssignment.add(task); | ||
| unassignedTasks.remove(task); | ||
| } else { | ||
| unassignedTaskToPreviousOwner.put(task, consumer); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (threadAssignment.size() < minStatefulTasksPerThread) { | ||
| if (threadAssignment.size() < tasksTargetCount) { | ||
| consumersToFill.offer(consumer); | ||
| } | ||
| } | ||
|
|
||
| // Next interleave remaining unassigned tasks amongst unfilled consumers | ||
| while (!consumersToFill.isEmpty()) { | ||
| final TaskId task = unassignedStatefulTasks.poll(); | ||
| final TaskId task = unassignedTasks.poll(); | ||
| if (task != null) { | ||
| final String consumer = consumersToFill.poll(); | ||
| final List<TaskId> threadAssignment = assignment.get(consumer); | ||
| threadAssignment.add(task); | ||
| if (threadAssignment.size() < minStatefulTasksPerThread) { | ||
| final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); | ||
| if (threadTaskCount < minTasksPerThread) { | ||
| consumersToFill.offer(consumer); | ||
| } | ||
| } else { | ||
| throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity"); | ||
| } | ||
| } | ||
|
|
||
| // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned | ||
| // stateful tasks still remaining that should now be distributed over the consumers | ||
| if (!unassignedStatefulTasks.isEmpty()) { | ||
| consumersToFill.addAll(consumers); | ||
| // At this point all consumers are at the min or min + 1 capacity. | ||
| // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning | ||
| // the active tasks some consumers already have min + 1 one tasks assigned. | ||
| // The tasks still remaining should now be distributed over the consumers that are still at min capacity | ||
|
Comment on lines
+1116
to
+1119
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your explanation. I've got it now, because we've change the total tasks count from original |
||
| if (!unassignedTasks.isEmpty()) { | ||
| for (final String consumer : consumers) { | ||
| final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0); | ||
| if (taskCount == minTasksPerThread) { | ||
| consumersToFill.add(consumer); | ||
|
Comment on lines
+1123
to
+1124
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the original algorithm assumes that when reaching this step, all consumers are at the min capacity. So that it could just do this to set if (!unassignedStatefulTasks.isEmpty())
consumersToFill.addAll(consumers);But I didn't see the where we change this algorithm. Please let me know where I missed. Thanks.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As commented above |
||
| } | ||
| } | ||
|
|
||
| // Go over the tasks we skipped earlier and assign them to their previous owner when possible | ||
| for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) { | ||
| final TaskId task = taskEntry.getKey(); | ||
| final String consumer = taskEntry.getValue(); | ||
| if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) { | ||
| if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) { | ||
| assignment.get(consumer).add(task); | ||
| unassignedStatefulTasks.remove(task); | ||
| unassignedTasks.remove(task); | ||
| // Remove this consumer since we know it is now at minCapacity + 1 | ||
| consumersToFill.remove(consumer); | ||
| } | ||
| } | ||
|
|
||
| // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity | ||
| for (final TaskId task : unassignedStatefulTasks) { | ||
| for (final TaskId task : unassignedTasks) { | ||
| final String consumer = consumersToFill.poll(); | ||
| final List<TaskId> threadAssignment = assignment.get(consumer); | ||
| threadAssignment.add(task); | ||
| } | ||
|
|
||
|
|
||
| // There must be at least one consumer still at min capacity while all the others are at min | ||
| // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count | ||
| while (unassignedStatelessTasksIter.hasNext()) { | ||
| final String consumer = consumersToFill.poll(); | ||
| if (consumer != null) { | ||
| final TaskId task = unassignedStatelessTasksIter.next(); | ||
| unassignedStatelessTasksIter.remove(); | ||
| assignment.get(consumer).add(task); | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Now just distribute tasks while circling through all the consumers | ||
| consumersToFill.addAll(consumers); | ||
|
|
||
| while (unassignedStatelessTasksIter.hasNext()) { | ||
| final TaskId task = unassignedStatelessTasksIter.next(); | ||
| final String consumer = consumersToFill.poll(); | ||
| assignment.get(consumer).add(task); | ||
| consumersToFill.offer(consumer); | ||
| // Update threadLoad | ||
| for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we skip this threadLoad update for stateless tasks?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My only worry with that is that to get a proper balance it would rely on the caller to always assign stateless tasks last which might not be clear to the caller.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense to me. We can keep it as is. Thanks. |
||
| final String consumer = taskEntry.getKey(); | ||
| final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size(); | ||
| threadLoad.put(consumer, totalCount); | ||
| } | ||
|
|
||
| return assignment; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add comment here to mention this is to combine active stateful assignment + active stateless assignmet...?