diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java index f67e54b2e1c4d..39a698adfa516 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; @@ -55,7 +56,103 @@ public final class TaskAssignmentUtils { private TaskAssignmentUtils() {} /** - * Return an {@code AssignmentError} for a task assignment created for an application. + * A simple config container for necessary parameters and optional overrides to apply when + * running the active or standby task rack-aware optimizations. + */ + public static class RackAwareOptimizationParams { + private final ApplicationState applicationState; + private final Optional trafficCostOverride; + private final Optional nonOverlapCostOverride; + private final Optional> tasksToOptimize; + + private RackAwareOptimizationParams(final ApplicationState applicationState, + final Optional trafficCostOverride, + final Optional nonOverlapCostOverride, + final Optional> tasksToOptimize) { + this.applicationState = applicationState; + this.trafficCostOverride = trafficCostOverride; + this.nonOverlapCostOverride = nonOverlapCostOverride; + this.tasksToOptimize = tasksToOptimize; + } + + /** + * Return a new config object with no overrides and the tasksToOptimize initialized to the set of all tasks in the given ApplicationState + */ + public static RackAwareOptimizationParams of(final ApplicationState applicationState) { + return new RackAwareOptimizationParams(applicationState, Optional.empty(), Optional.empty(), Optional.empty()); + } + + /** + * Return a new config object with the tasksToOptimize set to all stateful tasks in the given ApplicationState + */ + public RackAwareOptimizationParams forStatefulTasks() { + final SortedSet tasks = applicationState.allTasks().values() + .stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toCollection(TreeSet::new)); + return forTasks(tasks); + } + + /** + * Return a new config object with the tasksToOptimize set to all stateless tasks in the given ApplicationState + */ + public RackAwareOptimizationParams forStatelessTasks() { + final SortedSet tasks = applicationState.allTasks().values() + .stream() + .filter(taskInfo -> !taskInfo.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toCollection(TreeSet::new)); + return forTasks(tasks); + } + + /** + * Return a new config object with the provided tasksToOptimize + */ + public RackAwareOptimizationParams forTasks(final SortedSet tasksToOptimize) { + return new RackAwareOptimizationParams( + applicationState, + trafficCostOverride, + nonOverlapCostOverride, + Optional.of(tasksToOptimize) + ); + } + + /** + * Return a new config object with the provided trafficCost override applied + */ + public RackAwareOptimizationParams withTrafficCostOverride(final int trafficCostOverride) { + return new RackAwareOptimizationParams( + applicationState, + Optional.of(trafficCostOverride), + nonOverlapCostOverride, + tasksToOptimize + ); + } + + /** + * Return a new config object with the provided nonOverlapCost override applied + */ + public RackAwareOptimizationParams withNonOverlapCostOverride(final int nonOverlapCostOverride) { + return new RackAwareOptimizationParams( + applicationState, + trafficCostOverride, + Optional.of(nonOverlapCostOverride), + tasksToOptimize + ); + } + } + + /** + * Validate the passed-in {@link TaskAssignment} and return an {@link AssignmentError} representing the + * first error detected in the assignment, or {@link AssignmentError#NONE} if the assignment passes the + * verification check. + *

+ * Note: this verification is performed automatically by the StreamsPartitionAssignor on the assignment + * returned by the TaskAssignor, and the error returned to the assignor via the {@link TaskAssignor#onAssignmentComputed} + * callback. Therefore, it is not required to call this manually from the {@link TaskAssignor#assign} method. + * However, if an invalid assignment is returned it will fail the rebalance and kill the thread, so it may be useful to + * utilize this method in an assignor to verify the assignment before returning it and fix any errors it finds. * * @param applicationState The application for which this task assignment is being assessed. * @param taskAssignment The task assignment that will be validated. @@ -153,16 +250,14 @@ public static Map identityAssignment(final Ap * If rack-aware client tags are configured, the rack-aware standby task assignor will be used * * @param applicationState the metadata and other info describing the current application state - * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients - * - * @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment + * @param kafkaStreamsAssignments the KafkaStreams client assignments to add standby tasks to */ - public static Map defaultStandbyTaskAssignment(final ApplicationState applicationState, - final Map kafkaStreamsAssignments) { + public static void defaultStandbyTaskAssignment(final ApplicationState applicationState, + final Map kafkaStreamsAssignments) { if (!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) { - return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); + tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); } else { - return loadBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); + loadBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments); } } @@ -185,34 +280,43 @@ public static Map defaultStandbyTaskAssignmen *

* This method optimizes cross-rack traffic for active tasks only. For standby task optimization, * use {@link #optimizeRackAwareStandbyTasks}. + *

+ * It is recommended to run this optimization before assigning any standby tasks, especially if you have configured + * your KafkaStreams clients with assignment tags via the rack.aware.assignment.tags config since this method may + * shuffle around active tasks without considering the client tags and can result in a violation of the original + * client tag assignment's constraints. * - * @param applicationState the metadata and other info describing the current application state + * @param optimizationParams optional configuration parameters to apply * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients - * @param tasks the set of tasks to reassign if possible. Must already be assigned to a KafkaStreams client - * - * @return a map with the KafkaStreamsAssignments updated to minimize cross-rack traffic for active tasks */ - public static Map optimizeRackAwareActiveTasks(final ApplicationState applicationState, - final Map kafkaStreamsAssignments, - final SortedSet tasks) { - if (tasks.isEmpty()) { - return kafkaStreamsAssignments; + public static void optimizeRackAwareActiveTasks(final RackAwareOptimizationParams optimizationParams, + final Map kafkaStreamsAssignments) { + final ApplicationState applicationState = optimizationParams.applicationState; + final SortedSet activeTasksToOptimize = getTasksToOptimize(kafkaStreamsAssignments, optimizationParams, AssignedTask.Type.ACTIVE); + if (activeTasksToOptimize.isEmpty()) { + return; } - if (!canPerformRackAwareOptimization(applicationState, AssignedTask.Type.ACTIVE)) { - return kafkaStreamsAssignments; + if (!canPerformRackAwareOptimization(applicationState, optimizationParams, AssignedTask.Type.ACTIVE)) { + return; } initializeAssignmentsForAllClients(applicationState, kafkaStreamsAssignments); - final int crossRackTrafficCost = applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt(); - final int nonOverlapCost = applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt(); + final int crossRackTrafficCost = + optimizationParams.trafficCostOverride.orElseGet(() -> applicationState.assignmentConfigs() + .rackAwareTrafficCost() + .getAsInt()); + final int nonOverlapCost = + optimizationParams.nonOverlapCostOverride.orElseGet(() -> applicationState.assignmentConfigs() + .rackAwareNonOverlapCost() + .getAsInt()); final Map kafkaStreamsStates = applicationState.kafkaStreamsStates(false); - final List taskIds = new ArrayList<>(tasks); + final List taskIds = new ArrayList<>(activeTasksToOptimize); final Map> topicPartitionsByTaskId = applicationState.allTasks().values().stream() - .filter(taskInfo -> tasks.contains(taskInfo.id())) + .filter(taskInfo -> activeTasksToOptimize.contains(taskInfo.id())) .collect(Collectors.toMap(TaskInfo::id, TaskInfo::topicPartitions)); final List clientIds = new ArrayList<>(kafkaStreamsStates.keySet()); @@ -259,8 +363,6 @@ public static Map optimizeRackAwareActiveTask (assignment, taskId) -> assignment.removeTask(new AssignedTask(taskId, AssignedTask.Type.ACTIVE)), (assignment, taskId) -> assignment.tasks().containsKey(taskId) && assignment.tasks().get(taskId).type() == AssignedTask.Type.ACTIVE ); - - return kafkaStreamsAssignments; } /** @@ -283,31 +385,34 @@ public static Map optimizeRackAwareActiveTask * This method optimizes cross-rack traffic for standby tasks only. For active task optimization, * use {@link #optimizeRackAwareActiveTasks}. * + * @param optimizationParams optional configuration parameters to apply * @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients - * @param applicationState the metadata and other info describing the current application state - * - * @return a map with the KafkaStreamsAssignments updated to minimize cross-rack traffic for standby tasks */ - public static Map optimizeRackAwareStandbyTasks(final ApplicationState applicationState, - final Map kafkaStreamsAssignments) { - if (!canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY)) { - return kafkaStreamsAssignments; + public static void optimizeRackAwareStandbyTasks(final RackAwareOptimizationParams optimizationParams, + final Map kafkaStreamsAssignments) { + final ApplicationState applicationState = optimizationParams.applicationState; + final SortedSet standbyTasksToOptimize = getTasksToOptimize(kafkaStreamsAssignments, optimizationParams, AssignedTask.Type.STANDBY); + if (standbyTasksToOptimize.isEmpty()) { + return; + } + + if (!canPerformRackAwareOptimization(applicationState, optimizationParams, AssignedTask.Type.STANDBY)) { + return; } initializeAssignmentsForAllClients(applicationState, kafkaStreamsAssignments); - final int crossRackTrafficCost = applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt(); - final int nonOverlapCost = applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt(); + final int crossRackTrafficCost = + optimizationParams.trafficCostOverride.orElseGet(() -> applicationState.assignmentConfigs() + .rackAwareTrafficCost() + .getAsInt()); + final int nonOverlapCost = + optimizationParams.nonOverlapCostOverride.orElseGet(() -> applicationState.assignmentConfigs() + .rackAwareNonOverlapCost() + .getAsInt()); final Map kafkaStreamsStates = applicationState.kafkaStreamsStates(false); - final List standbyTasksToOptimize = kafkaStreamsAssignments.values().stream() - .flatMap(r -> r.tasks().values().stream()) - .filter(task -> task.type() == AssignedTask.Type.STANDBY) - .map(AssignedTask::id) - .distinct() - .collect(Collectors.toList()); - final Map> topicPartitionsByTaskId = applicationState.allTasks().values().stream().collect(Collectors.toMap( TaskInfo::id, @@ -317,7 +422,7 @@ public static Map optimizeRackAwareStandbyTas final List clientIds = new ArrayList<>(kafkaStreamsStates.keySet()); final long initialCost = computeTotalAssignmentCost( topicPartitionsByTaskId, - standbyTasksToOptimize, + new ArrayList<>(standbyTasksToOptimize), clientIds, kafkaStreamsAssignments, kafkaStreamsStates, @@ -411,7 +516,7 @@ public static Map optimizeRackAwareStandbyTas } final long finalCost = computeTotalAssignmentCost( topicPartitionsByTaskId, - standbyTasksToOptimize, + new ArrayList<>(standbyTasksToOptimize), clientIds, kafkaStreamsAssignments, kafkaStreamsStates, @@ -424,7 +529,6 @@ public static Map optimizeRackAwareStandbyTas final long duration = System.currentTimeMillis() - startTime; LOG.info("Assignment after {} rounds and {} milliseconds for standby task optimization is {}\n with cost {}", round, duration, kafkaStreamsAssignments, finalCost); - return kafkaStreamsAssignments; } private static long computeTotalAssignmentCost(final Map> topicPartitionsByTaskId, @@ -541,6 +645,7 @@ private static int getCrossRackTrafficCost(final Set topicPa * is set. */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, + final RackAwareOptimizationParams optimizationParams, final AssignedTask.Type taskType) { final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); @@ -902,6 +1007,20 @@ private static void initializeAssignmentsForAllClients(final ApplicationState ap } } + private static SortedSet getTasksToOptimize(final Map assignments, + final RackAwareOptimizationParams optimizationParams, + final AssignedTask.Type taskType) { + if (optimizationParams != null && optimizationParams.tasksToOptimize.isPresent()) { + return optimizationParams.tasksToOptimize.get(); + } + + return assignments.values().stream() + .flatMap(r -> r.tasks().values().stream()) + .filter(task -> task.type() == taskType) + .map(AssignedTask::id) + .collect(Collectors.toCollection(TreeSet::new)); + } + private static class TagStatistics { private final Map> tagKeyToValues; private final Map, Set> tagEntryToClients; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java index 827c9138c996a..3d5e5b247f776 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.TreeSet; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.kafka.streams.processor.TaskId; @@ -39,9 +38,11 @@ import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams; import org.apache.kafka.streams.processor.assignment.TaskAssignor; import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.assignment.TaskTopicPartition; +import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,21 +95,19 @@ private void optimizeActive(final ApplicationState applicationState, final Map currentAssignments = assignmentState.newAssignments; - final Set statefulTasks = applicationState.allTasks().values().stream() - .filter(TaskInfo::isStateful) - .map(TaskInfo::id) - .collect(Collectors.toSet()); - final Map optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( - applicationState, currentAssignments, new TreeSet<>(statefulTasks)); - - final Set statelessTasks = applicationState.allTasks().values().stream() - .filter(task -> !task.isStateful()) - .map(TaskInfo::id) - .collect(Collectors.toSet()); - final Map optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( - applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); - - assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks); + TaskAssignmentUtils.optimizeRackAwareActiveTasks( + RackAwareOptimizationParams.of(applicationState).forStatefulTasks(), + currentAssignments + ); + + TaskAssignmentUtils.optimizeRackAwareActiveTasks( + RackAwareOptimizationParams.of(applicationState) + .forStatelessTasks() + .withTrafficCostOverride(RackAwareTaskAssignor.STATELESS_TRAFFIC_COST) + .withNonOverlapCostOverride(RackAwareTaskAssignor.STATELESS_NON_OVERLAP_COST), + currentAssignments + ); + assignmentState.processOptimizedAssignments(currentAssignments); } private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { @@ -120,10 +119,9 @@ private void optimizeStandby(final ApplicationState applicationState, final Assi return; } - final Map currentAssignments = assignmentState.newAssignments; - final Map optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( - applicationState, currentAssignments); - assignmentState.processOptimizedAssignments(optimizedAssignments); + final Map assignments = assignmentState.newAssignments; + TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState), assignments); + assignmentState.processOptimizedAssignments(assignments); } private static void assignActive(final ApplicationState applicationState, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java index 7bcf386ab2ab1..8eff5812284d7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java @@ -50,6 +50,7 @@ import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams; import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.assignment.TaskTopicPartition; import org.junit.Rule; @@ -88,14 +89,14 @@ public void shouldOptimizeActiveTaskSimple(final String strategy) { ); TaskAssignmentUtils.optimizeRackAwareActiveTasks( - applicationState, assignments, new TreeSet<>(tasks.keySet())); + RackAwareOptimizationParams.of(applicationState), assignments); assertThat(assignments.size(), equalTo(2)); assertThat(assignments.get(processId(1)).tasks().keySet(), equalTo(mkSet(TASK_0_1))); assertThat(assignments.get(processId(2)).tasks().keySet(), equalTo(mkSet(TASK_0_0))); // Repeated to make sure nothing gets shifted around after the first round of optimization. TaskAssignmentUtils.optimizeRackAwareActiveTasks( - applicationState, assignments, new TreeSet<>(tasks.keySet())); + RackAwareOptimizationParams.of(applicationState), assignments); assertThat(assignments.size(), equalTo(2)); assertThat(assignments.get(processId(1)).tasks().keySet(), equalTo(mkSet(TASK_0_1))); assertThat(assignments.get(processId(2)).tasks().keySet(), equalTo(mkSet(TASK_0_0))); @@ -127,7 +128,7 @@ public void shouldOptimizeStandbyTasksBasic(final String strategy) { mkAssignment(AssignedTask.Type.STANDBY, 3, TASK_0_0) ); - TaskAssignmentUtils.optimizeRackAwareStandbyTasks(applicationState, assignments); + TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState), assignments); assertThat(assignments.size(), equalTo(3)); assertThat(assignments.get(processId(1)).tasks().keySet(), equalTo(mkSet(TASK_0_0, TASK_0_1))); assertThat(assignments.get(processId(2)).tasks().keySet(), equalTo(mkSet(TASK_0_0)));