From 2acb7d06536ebe69a6124210b956d0e2f9ab2678 Mon Sep 17 00:00:00 2001 From: Tim Patterson Date: Fri, 12 Nov 2021 13:44:43 +1300 Subject: [PATCH 1/6] KAFKA-12959 Balance standby and active stateful tasks evenly across threads --- .../internals/StreamsPartitionAssignor.java | 153 ++++++++++++------ .../StreamsPartitionAssignorTest.java | 103 ++++++++++-- 2 files changed, 195 insertions(+), 61 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2ae381d2f1eed..a78fc6d173496 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -784,21 +784,33 @@ private Map computeNewAssignment(final Set statefulT final ClientMetadata clientMetadata = clientEntry.getValue(); final ClientState state = clientMetadata.state; final SortedSet consumers = clientMetadata.consumers; + final Map threadTaskCounts = new HashMap<>(); - final Map> activeTaskAssignment = assignTasksToThreads( + final Map> activeTaskStatefulAssignment = assignStatefulTasksToThreads( state.statefulActiveTasks(), - state.statelessActiveTasks(), consumers, - state + state, + threadTaskCounts ); - final Map> standbyTaskAssignment = assignTasksToThreads( + final Map> standbyTaskAssignment = assignStatefulTasksToThreads( state.standbyTasks(), - Collections.emptySet(), consumers, - state + state, + threadTaskCounts + ); + + final Map> activeTaskStatelessAssignment = assignStatelessTasksToThreads( + state.statelessActiveTasks(), + consumers, + threadTaskCounts ); + final Map> activeTaskAssignment = activeTaskStatefulAssignment; + for (final Map.Entry> threadEntry : activeTaskStatelessAssignment.entrySet()) { + activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue()); + } + // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance, // note once we pick the first consumer within the process to trigger probing rebalance, other consumer // would not set to trigger any more. @@ -1029,60 +1041,65 @@ private Map> buildStandbyTaskMap(final String consum /** * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating - * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners - * will be interleaved by group id to spread subtopologies across threads and further balance the workload. + * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by + * group id to spread subtopologies across threads and further balance the workload. + * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys + * are evenly distributed */ - static Map> assignTasksToThreads(final Collection statefulTasksToAssign, - final Collection statelessTasksToAssign, - final SortedSet consumers, - final ClientState state) { + static Map> assignStatefulTasksToThreads(final Collection tasksToAssign, + final SortedSet consumers, + final ClientState state, + final Map threadLoad) { final Map> assignment = new HashMap<>(); for (final String consumer : consumers) { assignment.put(consumer, new ArrayList<>()); } - final List unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign); - Collections.sort(unassignedStatelessTasks); - - final Iterator unassignedStatelessTasksIter = unassignedStatelessTasks.iterator(); + int totalTasks = tasksToAssign.size(); + for (final Integer threadTaskCount : threadLoad.values()) { + totalTasks += threadTaskCount; + } - final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size()); - final PriorityQueue unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign); + final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size()); + final PriorityQueue unassignedTasks = new PriorityQueue<>(tasksToAssign); final Queue consumersToFill = new LinkedList<>(); // keep track of tasks that we have to skip during the first pass in case we can reassign them later // using tree-map to make sure the iteration ordering over keys are preserved final Map unassignedTaskToPreviousOwner = new TreeMap<>(); - if (!unassignedStatefulTasks.isEmpty()) { - // First assign stateful tasks to previous owner, up to the min expected tasks/thread + if (!unassignedTasks.isEmpty()) { + // First assign tasks to previous owner, up to the min expected tasks/thread for (final String consumer : consumers) { final List threadAssignment = assignment.get(consumer); for (final TaskId task : state.prevTasksByLag(consumer)) { - if (unassignedStatefulTasks.contains(task)) { - if (threadAssignment.size() < minStatefulTasksPerThread) { + if (unassignedTasks.contains(task)) { + final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); + if (threadTaskCount < minTasksPerThread) { threadAssignment.add(task); - unassignedStatefulTasks.remove(task); + unassignedTasks.remove(task); } else { unassignedTaskToPreviousOwner.put(task, consumer); } } } - if (threadAssignment.size() < minStatefulTasksPerThread) { + final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); + if (threadTaskCount < minTasksPerThread) { consumersToFill.offer(consumer); } } // Next interleave remaining unassigned tasks amongst unfilled consumers while (!consumersToFill.isEmpty()) { - final TaskId task = unassignedStatefulTasks.poll(); + final TaskId task = unassignedTasks.poll(); if (task != null) { final String consumer = consumersToFill.poll(); final List threadAssignment = assignment.get(consumer); threadAssignment.add(task); - if (threadAssignment.size() < minStatefulTasksPerThread) { + final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); + if (threadTaskCount < minTasksPerThread) { consumersToFill.offer(consumer); } } else { @@ -1090,43 +1107,82 @@ static Map> assignTasksToThreads(final Collection s } } - // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned - // stateful tasks still remaining that should now be distributed over the consumers - if (!unassignedStatefulTasks.isEmpty()) { - consumersToFill.addAll(consumers); + // At this point all consumers are at the min or min + 1 capacity, + // the tasks still remaining that should now be distributed over the consumers that are still + // at min capacity + if (!unassignedTasks.isEmpty()) { + for (final String consumer : consumers) { + final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0); + if (taskCount == minTasksPerThread) { + consumersToFill.add(consumer); + } + } // Go over the tasks we skipped earlier and assign them to their previous owner when possible for (final Map.Entry taskEntry : unassignedTaskToPreviousOwner.entrySet()) { final TaskId task = taskEntry.getKey(); final String consumer = taskEntry.getValue(); - if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) { + if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) { assignment.get(consumer).add(task); - unassignedStatefulTasks.remove(task); + unassignedTasks.remove(task); // Remove this consumer since we know it is now at minCapacity + 1 consumersToFill.remove(consumer); } } // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity - for (final TaskId task : unassignedStatefulTasks) { + for (final TaskId task : unassignedTasks) { final String consumer = consumersToFill.poll(); final List threadAssignment = assignment.get(consumer); threadAssignment.add(task); } + } + } + // Update threadLoad + for (final Map.Entry> taskEntry : assignment.entrySet()) { + final String consumer = taskEntry.getKey(); + final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size(); + threadLoad.put(consumer, totalCount); + } + return assignment; + } - // There must be at least one consumer still at min capacity while all the others are at min - // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count - while (unassignedStatelessTasksIter.hasNext()) { - final String consumer = consumersToFill.poll(); - if (consumer != null) { - final TaskId task = unassignedStatelessTasksIter.next(); - unassignedStatelessTasksIter.remove(); - assignment.get(consumer).add(task); - } else { - break; - } - } + static Map> assignStatelessTasksToThreads(final Collection statelessTasksToAssign, + final SortedSet consumers, + final Map threadLoad) { + final List tasksToAssign = new ArrayList<>(statelessTasksToAssign); + Collections.sort(tasksToAssign); + final Map> assignment = new HashMap<>(); + for (final String consumer : consumers) { + assignment.put(consumer, new ArrayList<>()); + } + + int maxThreadLoad = 0; + for (final int load : threadLoad.values()) { + maxThreadLoad = Integer.max(maxThreadLoad, load); + } + + final Queue consumersToFill = new LinkedList<>(); + + for (final String consumer : consumers) { + if (threadLoad.getOrDefault(consumer, 0) < maxThreadLoad) { + consumersToFill.offer(consumer); + } + } + + final Iterator unassignedStatelessTasksIter = tasksToAssign.iterator(); + + // There must be at least one consumer still at min capacity while all the others are at min + // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count + while (unassignedStatelessTasksIter.hasNext()) { + final String consumer = consumersToFill.poll(); + if (consumer != null) { + final TaskId task = unassignedStatelessTasksIter.next(); + unassignedStatelessTasksIter.remove(); + assignment.get(consumer).add(task); + } else { + break; } } @@ -1140,6 +1196,13 @@ static Map> assignTasksToThreads(final Collection s consumersToFill.offer(consumer); } + // Update threadLoad + for (final Map.Entry> taskEntry : assignment.entrySet()) { + final String consumer = taskEntry.getKey(); + final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size(); + threadLoad.put(consumer, totalCount); + } + return assignment; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 24bf5b86a757b..fdb122430f19e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -103,7 +103,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSortedSet; -import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads; +import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignStatefulTasksToThreads; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; @@ -317,11 +317,11 @@ public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() { assertEquivalentAssignment( previousAssignment, - assignTasksToThreads( + assignStatefulTasksToThreads( allTasks, - emptySet(), consumers, - state + state, + new HashMap<>() ) ); } @@ -351,11 +351,11 @@ public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() { state.assignActiveTasks(allTasks); final Map> newAssignment = - assignTasksToThreads( + assignStatefulTasksToThreads( allTasks, - emptySet(), consumers, - state + state, + new HashMap<>() ); previousAssignment.get(CONSUMER_2).add(newTask); @@ -384,11 +384,11 @@ public void shouldProduceMaximallyStickyAssignmentWhenMemberLeaves() { // Consumer 3 leaves the group consumers.remove(CONSUMER_3); - final Map> assignment = assignTasksToThreads( + final Map> assignment = assignStatefulTasksToThreads( allTasks, - emptySet(), consumers, - state + state, + new HashMap<>() ); // Each member should have all of its previous tasks reassigned plus some of consumer 3's tasks @@ -424,11 +424,11 @@ public void shouldProduceStickyEnoughAssignmentWhenNewMemberJoins() { state.initializePrevTasks(emptyMap()); state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks)); - final Map> assignment = assignTasksToThreads( + final Map> assignment = assignStatefulTasksToThreads( allTasks, - emptySet(), consumers, - state + state, + new HashMap<>() ); // we should move one task each from consumer 1 and consumer 3 to the new member, and none from consumer 2 @@ -464,11 +464,11 @@ public void shouldInterleaveTasksByGroupIdDuringNewAssignment() { Collections.shuffle(allTasks); final Map> interleavedTaskIds = - assignTasksToThreads( + assignStatefulTasksToThreads( allTasks, - emptySet(), consumers, - state + state, + new HashMap<>() ); assertThat(interleavedTaskIds, equalTo(assignment)); @@ -997,6 +997,77 @@ public void testAssignWithStandbyReplicas() { assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost()); } + @Test + public void testAssignWithStandbyReplicasBalance() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + + final List topics = asList("topic1"); + + final Set prevTasks00 = mkSet(TASK_0_0); + final Set standbyTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); + + createMockTaskManager(prevTasks00, standbyTasks); + adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + + subscriptions.put("consumer10", + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer11", + new Subscription( + emptyList(), + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer12", + new Subscription( + emptyList(), + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer13", + new Subscription( + emptyList(), + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer20", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode())); + subscriptions.put("consumer21", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode())); + subscriptions.put("consumer22", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode())); + + + final Map assignments = + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + // Consumers + final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); + final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); + final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData()); + final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData()); + final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); + final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); + final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData()); + + // Check each consumer has no more than 1 task + // (client 1 has more consumers than needed so consumer13 won't get a task) + assertEquals(1, info10.activeTasks().size() + info10.standbyTasks().size()); + assertEquals(1, info11.activeTasks().size() + info11.standbyTasks().size()); + assertEquals(1, info12.activeTasks().size() + info12.standbyTasks().size()); + assertEquals(0, info13.activeTasks().size() + info13.standbyTasks().size()); + assertEquals(1, info20.activeTasks().size() + info20.standbyTasks().size()); + assertEquals(1, info21.activeTasks().size() + info21.standbyTasks().size()); + assertEquals(1, info22.activeTasks().size() + info22.standbyTasks().size()); + } + @Test public void testOnAssignment() { taskManager = EasyMock.createStrictMock(TaskManager.class); From b7fc6e9b8dffc3b5d73989b1ef01afcaa7bf6aa8 Mon Sep 17 00:00:00 2001 From: Tim Patterson Date: Thu, 17 Feb 2022 22:50:40 +1300 Subject: [PATCH 2/6] [KAFKA-12959] Minor tweaks due to code review feedback --- .../internals/StreamsPartitionAssignor.java | 26 ++++---- .../StreamsPartitionAssignorTest.java | 62 +++++++------------ 2 files changed, 34 insertions(+), 54 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index a78fc6d173496..f461a5ef1c089 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -806,6 +806,8 @@ private Map computeNewAssignment(final Set statefulT threadTaskCounts ); + // Combine activeTaskStatefulAssignment and activeTaskStatelessAssignment together into + // activeTaskStatelessAssignment final Map> activeTaskAssignment = activeTaskStatefulAssignment; for (final Map.Entry> threadEntry : activeTaskStatelessAssignment.entrySet()) { activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue()); @@ -1072,11 +1074,13 @@ static Map> assignStatefulTasksToThreads(final Collection threadAssignment = assignment.get(consumer); + // The number of tasks we have to assign here to hit minTasksPerThread + final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0); for (final TaskId task : state.prevTasksByLag(consumer)) { if (unassignedTasks.contains(task)) { - final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); - if (threadTaskCount < minTasksPerThread) { + final int threadTaskCount = threadAssignment.size(); + if (threadTaskCount < tasksTargetCount) { threadAssignment.add(task); unassignedTasks.remove(task); } else { @@ -1085,8 +1089,8 @@ static Map> assignStatefulTasksToThreads(final Collection> assignStatefulTasksToThreads(final Collection> assignStatelessTasksToThreads(final Collection< consumersToFill.offer(consumer); } - // Update threadLoad - for (final Map.Entry> taskEntry : assignment.entrySet()) { - final String consumer = taskEntry.getKey(); - final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size(); - threadLoad.put(consumer, totalCount); - } - return assignment; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index fdb122430f19e..266d9f6200e9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1005,45 +1005,28 @@ public void testAssignWithStandbyReplicasBalance() { final List topics = asList("topic1"); - final Set prevTasks00 = mkSet(TASK_0_0); - final Set standbyTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); - - createMockTaskManager(prevTasks00, standbyTasks); + createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( singletonList(APPLICATION_ID + "-store1-changelog"), singletonList(3)) ); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - subscriptions.put("consumer10", - new Subscription( - topics, - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); - subscriptions.put("consumer11", - new Subscription( - emptyList(), - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); - subscriptions.put("consumer12", - new Subscription( - emptyList(), - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); - subscriptions.put("consumer13", - new Subscription( - emptyList(), - getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); - subscriptions.put("consumer20", - new Subscription( - topics, - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode())); - subscriptions.put("consumer21", - new Subscription( - topics, - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode())); - subscriptions.put("consumer22", - new Subscription( - topics, - getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, OTHER_END_POINT).encode())); + final List client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13"); + final List client2Consumers = asList("consumer20", "consumer21", "consumer22"); + for (final String consumerId : client1Consumers) { + subscriptions.put(consumerId, + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + } + for (final String consumerId : client2Consumers) { + subscriptions.put(consumerId, + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + } final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); @@ -1058,14 +1041,13 @@ public void testAssignWithStandbyReplicasBalance() { final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData()); // Check each consumer has no more than 1 task - // (client 1 has more consumers than needed so consumer13 won't get a task) - assertEquals(1, info10.activeTasks().size() + info10.standbyTasks().size()); - assertEquals(1, info11.activeTasks().size() + info11.standbyTasks().size()); - assertEquals(1, info12.activeTasks().size() + info12.standbyTasks().size()); - assertEquals(0, info13.activeTasks().size() + info13.standbyTasks().size()); - assertEquals(1, info20.activeTasks().size() + info20.standbyTasks().size()); - assertEquals(1, info21.activeTasks().size() + info21.standbyTasks().size()); - assertEquals(1, info22.activeTasks().size() + info22.standbyTasks().size()); + assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1); + assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1); + assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1); + assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1); + assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1); + assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1); + assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1); } @Test From 34acba57710947b43962d6e87ad552b1424567cf Mon Sep 17 00:00:00 2001 From: Tim Patterson Date: Sun, 20 Feb 2022 21:48:25 +1300 Subject: [PATCH 3/6] [KAFKA-12959] Add tests and code review feedback --- .../internals/StreamsPartitionAssignor.java | 6 +- .../StreamsPartitionAssignorTest.java | 95 ++++++++++++++++++- 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index f461a5ef1c089..5b3931764dedb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1079,8 +1079,7 @@ static Map> assignStatefulTasksToThreads(final Collection> assignStatefulTasksToThreads(final Collection(), "source1"); builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); @@ -1050,6 +1050,99 @@ public void testAssignWithStandbyReplicasBalance() { assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1); } + @Test + public void testAssignWithStandbyReplicasBalanceDense() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + + final List topics = asList("topic1"); + + createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); + adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + + subscriptions.put("consumer10", + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer20", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + + final Map assignments = + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + // Consumers + final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); + final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); + + // Check each consumer has 3 tasks + assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size()); + assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size()); + // Check that not all the actives are on one node + assertTrue(info10.activeTasks().size() < 3); + assertTrue(info20.activeTasks().size() < 3); + } + + @Test + public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1"); + builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state"); + + final List topics = asList("topic1"); + + createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); + adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + + subscriptions.put("consumer10", + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer11", + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer20", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer21", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + + final Map assignments = + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + // Consumers + final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); + final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); + final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); + final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); + + // 9 tasks spread over 4 consumers, so we should have no more than 3 tasks per consumer + assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 3); + assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 3); + assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 3); + assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 3); + // No more than 1 standby per node. + assertTrue(info10.standbyTasks().size() <= 1); + assertTrue(info11.standbyTasks().size() <= 1); + assertTrue(info20.standbyTasks().size() <= 1); + assertTrue(info21.standbyTasks().size() <= 1); + } + @Test public void testOnAssignment() { taskManager = EasyMock.createStrictMock(TaskManager.class); From 5f186a39e394d12c0899e06e8fd7dccdfd74eb35 Mon Sep 17 00:00:00 2001 From: Tim Patterson Date: Sun, 20 Feb 2022 22:38:44 +1300 Subject: [PATCH 4/6] [KAFKA-12959] get rid of assignStatelessTasksToThreads method --- .../internals/StreamsPartitionAssignor.java | 93 +++++-------------- .../StreamsPartitionAssignorTest.java | 17 ++-- 2 files changed, 35 insertions(+), 75 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 5b3931764dedb..530f9e9157648 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -60,7 +60,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -786,23 +785,27 @@ private Map computeNewAssignment(final Set statefulT final SortedSet consumers = clientMetadata.consumers; final Map threadTaskCounts = new HashMap<>(); - final Map> activeTaskStatefulAssignment = assignStatefulTasksToThreads( + final Map> activeTaskStatefulAssignment = assignTasksToThreads( state.statefulActiveTasks(), + true, consumers, state, threadTaskCounts ); - final Map> standbyTaskAssignment = assignStatefulTasksToThreads( + final Map> standbyTaskAssignment = assignTasksToThreads( state.standbyTasks(), + true, consumers, state, threadTaskCounts ); - final Map> activeTaskStatelessAssignment = assignStatelessTasksToThreads( + final Map> activeTaskStatelessAssignment = assignTasksToThreads( state.statelessActiveTasks(), + false, consumers, + state, threadTaskCounts ); @@ -1042,16 +1045,17 @@ private Map> buildStandbyTaskMap(final String consum } /** - * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating - * balance. The tasks are balanced across threads. Tasks without previous owners will be interleaved by + * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating + * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by * group id to spread subtopologies across threads and further balance the workload. * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys * are evenly distributed */ - static Map> assignStatefulTasksToThreads(final Collection tasksToAssign, - final SortedSet consumers, - final ClientState state, - final Map threadLoad) { + static Map> assignTasksToThreads(final Collection tasksToAssign, + final boolean isStateful, + final SortedSet consumers, + final ClientState state, + final Map threadLoad) { final Map> assignment = new HashMap<>(); for (final String consumer : consumers) { assignment.put(consumer, new ArrayList<>()); @@ -1071,19 +1075,21 @@ static Map> assignStatefulTasksToThreads(final Collection unassignedTaskToPreviousOwner = new TreeMap<>(); if (!unassignedTasks.isEmpty()) { - // First assign tasks to previous owner, up to the min expected tasks/thread + // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful for (final String consumer : consumers) { final List threadAssignment = assignment.get(consumer); // The number of tasks we have to assign here to hit minTasksPerThread final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0); - for (final TaskId task : state.prevTasksByLag(consumer)) { - if (unassignedTasks.contains(task)) { - if (threadAssignment.size() < tasksTargetCount) { - threadAssignment.add(task); - unassignedTasks.remove(task); - } else { - unassignedTaskToPreviousOwner.put(task, consumer); + if (isStateful) { + for (final TaskId task : state.prevTasksByLag(consumer)) { + if (unassignedTasks.contains(task)) { + if (threadAssignment.size() < tasksTargetCount) { + threadAssignment.add(task); + unassignedTasks.remove(task); + } else { + unassignedTaskToPreviousOwner.put(task, consumer); + } } } } @@ -1151,57 +1157,6 @@ static Map> assignStatefulTasksToThreads(final Collection> assignStatelessTasksToThreads(final Collection statelessTasksToAssign, - final SortedSet consumers, - final Map threadLoad) { - final List tasksToAssign = new ArrayList<>(statelessTasksToAssign); - Collections.sort(tasksToAssign); - final Map> assignment = new HashMap<>(); - for (final String consumer : consumers) { - assignment.put(consumer, new ArrayList<>()); - } - - int maxThreadLoad = 0; - for (final int load : threadLoad.values()) { - maxThreadLoad = Integer.max(maxThreadLoad, load); - } - - final Queue consumersToFill = new LinkedList<>(); - - for (final String consumer : consumers) { - if (threadLoad.getOrDefault(consumer, 0) < maxThreadLoad) { - consumersToFill.offer(consumer); - } - } - - final Iterator unassignedStatelessTasksIter = tasksToAssign.iterator(); - - // There must be at least one consumer still at min capacity while all the others are at min - // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count - while (unassignedStatelessTasksIter.hasNext()) { - final String consumer = consumersToFill.poll(); - if (consumer != null) { - final TaskId task = unassignedStatelessTasksIter.next(); - unassignedStatelessTasksIter.remove(); - assignment.get(consumer).add(task); - } else { - break; - } - } - - // Now just distribute tasks while circling through all the consumers - consumersToFill.addAll(consumers); - - while (unassignedStatelessTasksIter.hasNext()) { - final TaskId task = unassignedStatelessTasksIter.next(); - final String consumer = consumersToFill.poll(); - assignment.get(consumer).add(task); - consumersToFill.offer(consumer); - } - - return assignment; - } - private void validateMetadataVersions(final int receivedAssignmentMetadataVersion, final int latestCommonlySupportedVersion) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 3eea44b7f0a48..6812118d7e2d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -103,7 +103,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSortedSet; -import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignStatefulTasksToThreads; +import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; @@ -317,8 +317,9 @@ public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() { assertEquivalentAssignment( previousAssignment, - assignStatefulTasksToThreads( + assignTasksToThreads( allTasks, + true, consumers, state, new HashMap<>() @@ -351,8 +352,9 @@ public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() { state.assignActiveTasks(allTasks); final Map> newAssignment = - assignStatefulTasksToThreads( + assignTasksToThreads( allTasks, + true, consumers, state, new HashMap<>() @@ -384,8 +386,9 @@ public void shouldProduceMaximallyStickyAssignmentWhenMemberLeaves() { // Consumer 3 leaves the group consumers.remove(CONSUMER_3); - final Map> assignment = assignStatefulTasksToThreads( + final Map> assignment = assignTasksToThreads( allTasks, + true, consumers, state, new HashMap<>() @@ -424,8 +427,9 @@ public void shouldProduceStickyEnoughAssignmentWhenNewMemberJoins() { state.initializePrevTasks(emptyMap()); state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks)); - final Map> assignment = assignStatefulTasksToThreads( + final Map> assignment = assignTasksToThreads( allTasks, + true, consumers, state, new HashMap<>() @@ -464,8 +468,9 @@ public void shouldInterleaveTasksByGroupIdDuringNewAssignment() { Collections.shuffle(allTasks); final Map> interleavedTaskIds = - assignStatefulTasksToThreads( + assignTasksToThreads( allTasks, + true, consumers, state, new HashMap<>() From 1d8b25d311577789221e7a38ee1d8943d9bf64dd Mon Sep 17 00:00:00 2001 From: Tim Patterson Date: Mon, 21 Feb 2022 23:11:47 +1300 Subject: [PATCH 5/6] [KAFKA-12959] Code review feedback --- .../processor/internals/StreamsPartitionAssignor.java | 8 +++----- .../processor/internals/StreamsPartitionAssignorTest.java | 6 ++++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 530f9e9157648..d88947c4cba2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1048,8 +1048,9 @@ private Map> buildStandbyTaskMap(final String consum * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by * group id to spread subtopologies across threads and further balance the workload. + * Stateless tasks are simply spread across threads without taking into account previous ownership. * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys - * are evenly distributed + * tasks are evenly distributed */ static Map> assignTasksToThreads(final Collection tasksToAssign, final boolean isStateful, @@ -1061,10 +1062,7 @@ static Map> assignTasksToThreads(final Collection t assignment.put(consumer, new ArrayList<>()); } - int totalTasks = tasksToAssign.size(); - for (final Integer threadTaskCount : threadLoad.values()) { - totalTasks += threadTaskCount; - } + final int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum); final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size()); final PriorityQueue unassignedTasks = new PriorityQueue<>(tasksToAssign); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 6812118d7e2d1..6f5a978a3c5a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1098,10 +1098,12 @@ public void testAssignWithStandbyReplicasBalanceDense() { public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() { builder.addSource(null, "source1", null, null, null, "topic1"); builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1"); - builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state"); - final List topics = asList("topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); + builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source2"); + + final List topics = asList("topic1", "topic2"); createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( From 9f2a4570965dcc4a13e7d88ebb5f9e174a74d465 Mon Sep 17 00:00:00 2001 From: Tim Patterson Date: Mon, 21 Feb 2022 23:33:55 +1300 Subject: [PATCH 6/6] [KAFKA-12959] Fix grammar --- .../streams/processor/internals/StreamsPartitionAssignor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index d88947c4cba2a..e2ce8fa5d65cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1049,7 +1049,7 @@ private Map> buildStandbyTaskMap(final String consum * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by * group id to spread subtopologies across threads and further balance the workload. * Stateless tasks are simply spread across threads without taking into account previous ownership. - * threadLoad is a map that keeps track of task load per thread across multiple calls so actives and standbys + * threadLoad is a map that keeps track of task load per thread across multiple calls so active and standby * tasks are evenly distributed */ static Map> assignTasksToThreads(final Collection tasksToAssign,