diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2ae381d2f1eed..e2ce8fa5d65cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -60,7 +60,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -784,21 +783,39 @@ private Map computeNewAssignment(final Set statefulT final ClientMetadata clientMetadata = clientEntry.getValue(); final ClientState state = clientMetadata.state; final SortedSet consumers = clientMetadata.consumers; + final Map threadTaskCounts = new HashMap<>(); - final Map> activeTaskAssignment = assignTasksToThreads( + final Map> activeTaskStatefulAssignment = assignTasksToThreads( state.statefulActiveTasks(), - state.statelessActiveTasks(), + true, consumers, - state + state, + threadTaskCounts ); final Map> standbyTaskAssignment = assignTasksToThreads( state.standbyTasks(), - Collections.emptySet(), + true, + consumers, + state, + threadTaskCounts + ); + + final Map> activeTaskStatelessAssignment = assignTasksToThreads( + state.statelessActiveTasks(), + false, consumers, - state + state, + threadTaskCounts ); + // Combine activeTaskStatefulAssignment and activeTaskStatelessAssignment together into + // activeTaskStatelessAssignment + final Map> activeTaskAssignment = activeTaskStatefulAssignment; + for (final Map.Entry> threadEntry : activeTaskStatelessAssignment.entrySet()) { + activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue()); + } + // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance, // note once we pick the first consumer within the process to trigger probing rebalance, other consumer // would not set to trigger any more. @@ -1028,61 +1045,67 @@ private Map> buildStandbyTaskMap(final String consum } /** - * Generate an assignment that tries to preserve thread-level stickiness of stateful tasks without violating - * balance. The stateful and total task load are both balanced across threads. Tasks without previous owners - * will be interleaved by group id to spread subtopologies across threads and further balance the workload. + * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating + * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by + * group id to spread subtopologies across threads and further balance the workload. + * Stateless tasks are simply spread across threads without taking into account previous ownership. + * threadLoad is a map that keeps track of task load per thread across multiple calls so active and standby + * tasks are evenly distributed */ - static Map> assignTasksToThreads(final Collection statefulTasksToAssign, - final Collection statelessTasksToAssign, + static Map> assignTasksToThreads(final Collection tasksToAssign, + final boolean isStateful, final SortedSet consumers, - final ClientState state) { + final ClientState state, + final Map threadLoad) { final Map> assignment = new HashMap<>(); for (final String consumer : consumers) { assignment.put(consumer, new ArrayList<>()); } - final List unassignedStatelessTasks = new ArrayList<>(statelessTasksToAssign); - Collections.sort(unassignedStatelessTasks); - - final Iterator unassignedStatelessTasksIter = unassignedStatelessTasks.iterator(); + final int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum); - final int minStatefulTasksPerThread = (int) Math.floor(((double) statefulTasksToAssign.size()) / consumers.size()); - final PriorityQueue unassignedStatefulTasks = new PriorityQueue<>(statefulTasksToAssign); + final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size()); + final PriorityQueue unassignedTasks = new PriorityQueue<>(tasksToAssign); final Queue consumersToFill = new LinkedList<>(); // keep track of tasks that we have to skip during the first pass in case we can reassign them later // using tree-map to make sure the iteration ordering over keys are preserved final Map unassignedTaskToPreviousOwner = new TreeMap<>(); - if (!unassignedStatefulTasks.isEmpty()) { - // First assign stateful tasks to previous owner, up to the min expected tasks/thread + if (!unassignedTasks.isEmpty()) { + // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful for (final String consumer : consumers) { final List threadAssignment = assignment.get(consumer); - - for (final TaskId task : state.prevTasksByLag(consumer)) { - if (unassignedStatefulTasks.contains(task)) { - if (threadAssignment.size() < minStatefulTasksPerThread) { - threadAssignment.add(task); - unassignedStatefulTasks.remove(task); - } else { - unassignedTaskToPreviousOwner.put(task, consumer); + // The number of tasks we have to assign here to hit minTasksPerThread + final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0); + + if (isStateful) { + for (final TaskId task : state.prevTasksByLag(consumer)) { + if (unassignedTasks.contains(task)) { + if (threadAssignment.size() < tasksTargetCount) { + threadAssignment.add(task); + unassignedTasks.remove(task); + } else { + unassignedTaskToPreviousOwner.put(task, consumer); + } } } } - if (threadAssignment.size() < minStatefulTasksPerThread) { + if (threadAssignment.size() < tasksTargetCount) { consumersToFill.offer(consumer); } } // Next interleave remaining unassigned tasks amongst unfilled consumers while (!consumersToFill.isEmpty()) { - final TaskId task = unassignedStatefulTasks.poll(); + final TaskId task = unassignedTasks.poll(); if (task != null) { final String consumer = consumersToFill.poll(); final List threadAssignment = assignment.get(consumer); threadAssignment.add(task); - if (threadAssignment.size() < minStatefulTasksPerThread) { + final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0); + if (threadTaskCount < minTasksPerThread) { consumersToFill.offer(consumer); } } else { @@ -1090,54 +1113,43 @@ static Map> assignTasksToThreads(final Collection s } } - // At this point all consumers are at the min capacity, so there may be up to N - 1 unassigned - // stateful tasks still remaining that should now be distributed over the consumers - if (!unassignedStatefulTasks.isEmpty()) { - consumersToFill.addAll(consumers); + // At this point all consumers are at the min or min + 1 capacity. + // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning + // the active tasks some consumers already have min + 1 one tasks assigned. + // The tasks still remaining should now be distributed over the consumers that are still at min capacity + if (!unassignedTasks.isEmpty()) { + for (final String consumer : consumers) { + final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0); + if (taskCount == minTasksPerThread) { + consumersToFill.add(consumer); + } + } // Go over the tasks we skipped earlier and assign them to their previous owner when possible for (final Map.Entry taskEntry : unassignedTaskToPreviousOwner.entrySet()) { final TaskId task = taskEntry.getKey(); final String consumer = taskEntry.getValue(); - if (consumersToFill.contains(consumer) && unassignedStatefulTasks.contains(task)) { + if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) { assignment.get(consumer).add(task); - unassignedStatefulTasks.remove(task); + unassignedTasks.remove(task); // Remove this consumer since we know it is now at minCapacity + 1 consumersToFill.remove(consumer); } } // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity - for (final TaskId task : unassignedStatefulTasks) { + for (final TaskId task : unassignedTasks) { final String consumer = consumersToFill.poll(); final List threadAssignment = assignment.get(consumer); threadAssignment.add(task); } - - - // There must be at least one consumer still at min capacity while all the others are at min - // capacity + 1, so start distributing stateless tasks to get all consumers back to the same count - while (unassignedStatelessTasksIter.hasNext()) { - final String consumer = consumersToFill.poll(); - if (consumer != null) { - final TaskId task = unassignedStatelessTasksIter.next(); - unassignedStatelessTasksIter.remove(); - assignment.get(consumer).add(task); - } else { - break; - } - } } } - - // Now just distribute tasks while circling through all the consumers - consumersToFill.addAll(consumers); - - while (unassignedStatelessTasksIter.hasNext()) { - final TaskId task = unassignedStatelessTasksIter.next(); - final String consumer = consumersToFill.poll(); - assignment.get(consumer).add(task); - consumersToFill.offer(consumer); + // Update threadLoad + for (final Map.Entry> taskEntry : assignment.entrySet()) { + final String consumer = taskEntry.getKey(); + final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size(); + threadLoad.put(consumer, totalCount); } return assignment; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 24bf5b86a757b..6f5a978a3c5a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -319,9 +319,10 @@ public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() { previousAssignment, assignTasksToThreads( allTasks, - emptySet(), + true, consumers, - state + state, + new HashMap<>() ) ); } @@ -353,9 +354,10 @@ public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() { final Map> newAssignment = assignTasksToThreads( allTasks, - emptySet(), + true, consumers, - state + state, + new HashMap<>() ); previousAssignment.get(CONSUMER_2).add(newTask); @@ -386,9 +388,10 @@ public void shouldProduceMaximallyStickyAssignmentWhenMemberLeaves() { final Map> assignment = assignTasksToThreads( allTasks, - emptySet(), + true, consumers, - state + state, + new HashMap<>() ); // Each member should have all of its previous tasks reassigned plus some of consumer 3's tasks @@ -426,9 +429,10 @@ public void shouldProduceStickyEnoughAssignmentWhenNewMemberJoins() { final Map> assignment = assignTasksToThreads( allTasks, - emptySet(), + true, consumers, - state + state, + new HashMap<>() ); // we should move one task each from consumer 1 and consumer 3 to the new member, and none from consumer 2 @@ -466,9 +470,10 @@ public void shouldInterleaveTasksByGroupIdDuringNewAssignment() { final Map> interleavedTaskIds = assignTasksToThreads( allTasks, - emptySet(), + true, consumers, - state + state, + new HashMap<>() ); assertThat(interleavedTaskIds, equalTo(assignment)); @@ -997,6 +1002,154 @@ public void testAssignWithStandbyReplicas() { assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost()); } + @Test + public void testAssignWithStandbyReplicasBalanceSparse() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + + final List topics = asList("topic1"); + + createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); + adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + + final List client1Consumers = asList("consumer10", "consumer11", "consumer12", "consumer13"); + final List client2Consumers = asList("consumer20", "consumer21", "consumer22"); + + for (final String consumerId : client1Consumers) { + subscriptions.put(consumerId, + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + } + for (final String consumerId : client2Consumers) { + subscriptions.put(consumerId, + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + } + + final Map assignments = + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + // Consumers + final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); + final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); + final AssignmentInfo info12 = AssignmentInfo.decode(assignments.get("consumer12").userData()); + final AssignmentInfo info13 = AssignmentInfo.decode(assignments.get("consumer13").userData()); + final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); + final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); + final AssignmentInfo info22 = AssignmentInfo.decode(assignments.get("consumer22").userData()); + + // Check each consumer has no more than 1 task + assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 1); + assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 1); + assertTrue(info12.activeTasks().size() + info12.standbyTasks().size() <= 1); + assertTrue(info13.activeTasks().size() + info13.standbyTasks().size() <= 1); + assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 1); + assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 1); + assertTrue(info22.activeTasks().size() + info22.standbyTasks().size() <= 1); + } + + @Test + public void testAssignWithStandbyReplicasBalanceDense() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); + + final List topics = asList("topic1"); + + createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); + adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + + subscriptions.put("consumer10", + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer20", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + + final Map assignments = + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + // Consumers + final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); + final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); + + // Check each consumer has 3 tasks + assertEquals(3, info10.activeTasks().size() + info10.standbyTasks().size()); + assertEquals(3, info20.activeTasks().size() + info20.standbyTasks().size()); + // Check that not all the actives are on one node + assertTrue(info10.activeTasks().size() < 3); + assertTrue(info20.activeTasks().size() < 3); + } + + @Test + public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor_with_state", new MockApiProcessorSupplier<>(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor_with_state"); + + builder.addSource(null, "source2", null, null, null, "topic2"); + builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source2"); + + final List topics = asList("topic1", "topic2"); + + createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS); + adminClient = createMockAdminClientForAssignor(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); + + subscriptions.put("consumer10", + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer11", + new Subscription( + topics, + getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer20", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + subscriptions.put("consumer21", + new Subscription( + topics, + getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS, USER_END_POINT).encode())); + + final Map assignments = + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + // Consumers + final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); + final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); + final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); + final AssignmentInfo info21 = AssignmentInfo.decode(assignments.get("consumer21").userData()); + + // 9 tasks spread over 4 consumers, so we should have no more than 3 tasks per consumer + assertTrue(info10.activeTasks().size() + info10.standbyTasks().size() <= 3); + assertTrue(info11.activeTasks().size() + info11.standbyTasks().size() <= 3); + assertTrue(info20.activeTasks().size() + info20.standbyTasks().size() <= 3); + assertTrue(info21.activeTasks().size() + info21.standbyTasks().size() <= 3); + // No more than 1 standby per node. + assertTrue(info10.standbyTasks().size() <= 1); + assertTrue(info11.standbyTasks().size() <= 1); + assertTrue(info20.standbyTasks().size() <= 1); + assertTrue(info21.standbyTasks().size() <= 1); + } + @Test public void testOnAssignment() { taskManager = EasyMock.createStrictMock(TaskManager.class);