diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java index 6a7ca68a50f72..abd9d50b8f417 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java @@ -20,8 +20,6 @@ import java.util.OptionalInt; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor; -import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; /** * Assignment related configs for the Kafka Streams {@link TaskAssignor}. @@ -43,26 +41,8 @@ public static AssignmentConfigs of(final StreamsConfig configs) { final long probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); final List rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); final String rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); - Integer rackAwareTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); - Integer rackAwareNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); - - final String assignorClassName = configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); - if (StickyTaskAssignor.class.getName().equals(assignorClassName)) { - if (rackAwareTrafficCost == null) { - rackAwareTrafficCost = StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST; - } - if (rackAwareNonOverlapCost == null) { - rackAwareNonOverlapCost = StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST; - } - } else if (HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) { - // TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor class once it implements the new TaskAssignor interface - if (rackAwareTrafficCost == null) { - rackAwareTrafficCost = HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST; - } - if (rackAwareNonOverlapCost == null) { - rackAwareNonOverlapCost = HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST; - } - } + final Integer rackAwareTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); + final Integer rackAwareNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); return new AssignmentConfigs( acceptableRecoveryLag, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java index f5205c8422bdf..848219d8c72dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java @@ -19,6 +19,7 @@ import static java.util.Collections.unmodifiableMap; import java.time.Instant; +import java.util.Arrays; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -108,6 +109,16 @@ public Optional followupRebalanceDeadline() { return followupRebalanceDeadline; } + @Override + public String toString() { + return String.format( + "KafkaStreamsAssignment{%s, %s, %s}", + processId, + Arrays.toString(tasks.values().toArray(new AssignedTask[0])), + followupRebalanceDeadline + ); + } + public static class AssignedTask { private final TaskId id; private final Type taskType; @@ -157,5 +168,10 @@ public boolean equals(final Object obj) { final AssignedTask other = (AssignedTask) obj; return this.id.equals(other.id()) && this.taskType == other.taskType; } + + @Override + public String toString() { + return String.format("AssignedTask{%s, %s}", taskType, id); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java index 9dd4025112a62..0a3c2c2bfb4e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java @@ -35,6 +35,14 @@ public UUID id() { return id; } + /** + * + * @return a randomly generated process id + */ + public static ProcessId randomProcessId() { + return new ProcessId(UUID.randomUUID()); + } + @Override public String toString() { return id.toString(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java index 39a698adfa516..9af8a10cbd6d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java @@ -59,16 +59,16 @@ private TaskAssignmentUtils() {} * A simple config container for necessary parameters and optional overrides to apply when * running the active or standby task rack-aware optimizations. */ - public static class RackAwareOptimizationParams { + public static final class RackAwareOptimizationParams { private final ApplicationState applicationState; private final Optional trafficCostOverride; private final Optional nonOverlapCostOverride; private final Optional> tasksToOptimize; private RackAwareOptimizationParams(final ApplicationState applicationState, - final Optional trafficCostOverride, - final Optional nonOverlapCostOverride, - final Optional> tasksToOptimize) { + final Optional trafficCostOverride, + final Optional nonOverlapCostOverride, + final Optional> tasksToOptimize) { this.applicationState = applicationState; this.trafficCostOverride = trafficCostOverride; this.nonOverlapCostOverride = nonOverlapCostOverride; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java index 3d5e5b247f776..fe01b502a22df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java @@ -95,10 +95,15 @@ private void optimizeActive(final ApplicationState applicationState, final Map currentAssignments = assignmentState.newAssignments; - TaskAssignmentUtils.optimizeRackAwareActiveTasks( - RackAwareOptimizationParams.of(applicationState).forStatefulTasks(), - currentAssignments - ); + final RackAwareOptimizationParams statefulTaskParams = RackAwareOptimizationParams.of(applicationState) + .withTrafficCostOverride( + applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST) + ) + .withNonOverlapCostOverride( + applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST) + ) + .forStatefulTasks(); + TaskAssignmentUtils.optimizeRackAwareActiveTasks(statefulTaskParams, currentAssignments); TaskAssignmentUtils.optimizeRackAwareActiveTasks( RackAwareOptimizationParams.of(applicationState) @@ -120,7 +125,15 @@ private void optimizeStandby(final ApplicationState applicationState, final Assi } final Map assignments = assignmentState.newAssignments; - TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState), assignments); + + final RackAwareOptimizationParams optimizationParams = RackAwareOptimizationParams.of(applicationState) + .withTrafficCostOverride( + applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST) + ) + .withNonOverlapCostOverride( + applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST) + ); + TaskAssignmentUtils.optimizeRackAwareStandbyTasks(optimizationParams, assignments); assignmentState.processOptimizedAssignments(assignments); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 38b164f1969df..887ef86faf54b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -93,7 +93,6 @@ import static java.util.Collections.unmodifiableSet; import static java.util.Map.Entry.comparingByKey; -import static java.util.UUID.randomUUID; import static org.apache.kafka.common.utils.Utils.filterMap; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsResult; @@ -197,7 +196,7 @@ public interface UserTaskAssignmentListener { } // keep track of any future consumers in a "dummy" Client since we can't decipher their subscription - private static final ProcessId FUTURE_ID = new ProcessId(randomUUID()); + private static final ProcessId FUTURE_ID = ProcessId.randomProcessId(); protected static final Comparator PARTITION_COMPARATOR = Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 9c7338f748dcb..43c07ee3c2811 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -18,7 +18,6 @@ import java.util.Arrays; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; @@ -60,20 +59,12 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.assignment.ApplicationState; -import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; -import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; import org.apache.kafka.streams.processor.assignment.ProcessId; -import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; -import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ClientState; -import org.apache.kafka.streams.processor.internals.assignment.DefaultApplicationState; -import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo; -import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition; import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; @@ -2616,123 +2607,6 @@ public void testClientTags() { assertEquals(clientTags, partitionAssignor.clientTags()); } - @Test - public void testValidateTaskAssignment() { - createDefaultMockTaskManager(); - configureDefaultPartitionAssignor(); - - final StreamsConfig streamsConfig = new StreamsConfig(configProps()); - final AssignmentConfigs assignmentConfigs = AssignmentConfigs.of(streamsConfig); - final Set tasks = mkSet( - new DefaultTaskInfo( - new TaskId(1, 1), - false, - mkSet(), - mkSet( - new DefaultTaskTopicPartition( - new TopicPartition("t1", 1), - true, - false, - () -> { } - ) - ) - ) - ); - - final ProcessId clientUuid1 = new ProcessId(UUID.randomUUID()); - final ProcessId clientUuid2 = new ProcessId(UUID.randomUUID()); - final Map clients = mkMap( - mkEntry(clientUuid1, new StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint1:80", mkMap(), Optional.empty())), - mkEntry(clientUuid2, new StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint2:80", mkMap(), Optional.empty())) - ); - final ApplicationState applicationState = new DefaultApplicationState( - assignmentConfigs, - tasks.stream().collect(Collectors.toMap( - TaskInfo::id, - t -> t - )), - clients - ); - - // **** - final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment noError = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( - mkSet( - KafkaStreamsAssignment.of(clientUuid1, mkSet( - new KafkaStreamsAssignment.AssignedTask( - new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE - ) - )), - KafkaStreamsAssignment.of(clientUuid2, mkSet()) - ) - ); - org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError error = TaskAssignmentUtils.validateTaskAssignment(applicationState, noError); - assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.NONE, error); - - // **** - final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment missingProcessId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( - mkSet( - KafkaStreamsAssignment.of(clientUuid1, mkSet( - new KafkaStreamsAssignment.AssignedTask( - new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE - ) - )) - ) - ); - error = TaskAssignmentUtils.validateTaskAssignment(applicationState, missingProcessId); - assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.MISSING_PROCESS_ID, error); - - // **** - final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment unknownProcessId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( - mkSet( - KafkaStreamsAssignment.of(clientUuid1, mkSet( - new KafkaStreamsAssignment.AssignedTask( - new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE - ) - )), - KafkaStreamsAssignment.of(clientUuid2, mkSet()), - KafkaStreamsAssignment.of(new ProcessId(UUID.randomUUID()), mkSet()) - ) - ); - error = TaskAssignmentUtils.validateTaskAssignment(applicationState, unknownProcessId); - assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID, error); - - // **** - final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment unknownTaskId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( - mkSet( - KafkaStreamsAssignment.of(clientUuid1, mkSet( - new KafkaStreamsAssignment.AssignedTask( - new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE - ) - )), - KafkaStreamsAssignment.of(clientUuid2, mkSet( - new KafkaStreamsAssignment.AssignedTask( - new TaskId(13, 13), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE - ) - )) - ) - ); - error = TaskAssignmentUtils.validateTaskAssignment(applicationState, unknownTaskId); - assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_TASK_ID, error); - - // **** - final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment activeTaskDuplicated = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( - mkSet( - KafkaStreamsAssignment.of(clientUuid1, mkSet( - new KafkaStreamsAssignment.AssignedTask( - new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE - ) - )), - KafkaStreamsAssignment.of(clientUuid2, mkSet( - new KafkaStreamsAssignment.AssignedTask( - new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE - ) - )) - ) - ); - error = TaskAssignmentUtils.validateTaskAssignment(applicationState, activeTaskDuplicated); - assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES, error); - } - private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder { private Map corruptedTopicGroups; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index c6f568466e596..22fd4052b34d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.UUID; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DeleteRecordsResult; import org.apache.kafka.clients.admin.DeletedRecords; @@ -240,7 +239,7 @@ private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TaskManager taskManager = new TaskManager( time, changeLogReader, - new ProcessId(UUID.randomUUID()), + ProcessId.randomProcessId(), "taskManagerTest", activeTaskCreator, standbyTaskCreator, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java new file mode 100644 index 0000000000000..ef595d142b68e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsAssignmentTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.processIdForInt; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.junit.Test; + +public class KafkaStreamsAssignmentTest { + @Test + public void shouldHaveReadableString() { + final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of( + processIdForInt(1), + mkSet( + new AssignedTask(TASK_0_0, AssignedTask.Type.ACTIVE), + new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY), + new AssignedTask(TASK_0_2, AssignedTask.Type.ACTIVE) + ) + ); + + assertThat( + assignment.toString(), + equalTo("KafkaStreamsAssignment{00000000-0000-0000-0000-000000000001, " + + "[AssignedTask{ACTIVE, 0_2}, AssignedTask{STANDBY, 0_1}, AssignedTask{ACTIVE, 0_0}], " + + "Optional.empty}")); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java index a0b0c457a3bcd..b28cd3678a10b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java @@ -31,7 +31,6 @@ import java.util.Optional; import java.util.TreeMap; import java.util.TreeSet; -import java.util.UUID; import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.junit.Test; @@ -40,7 +39,7 @@ public class KafkaStreamsStateTest { @Test public void shouldCorrectlyReturnTasksByLag() { final KafkaStreamsState state = new DefaultKafkaStreamsState( - new ProcessId(UUID.randomUUID()), + ProcessId.randomProcessId(), 10, mkMap(), mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1), @@ -71,7 +70,7 @@ public void shouldCorrectlyReturnTasksByLag() { @Test public void shouldThrowExceptionOnLagOperationsIfLagsWereNotComputed() { final KafkaStreamsState state = new DefaultKafkaStreamsState( - new ProcessId(UUID.randomUUID()), + ProcessId.randomProcessId(), 10, mkMap(), mkSortedSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java index 8eff5812284d7..2295a865aa744 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java @@ -25,10 +25,12 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_3; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_4; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_5; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.processIdForInt; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -50,11 +52,12 @@ import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams; import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.assignment.TaskTopicPartition; import org.junit.Rule; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.junit.rules.Timeout; @@ -208,6 +211,222 @@ public void shouldAssignStandbyTasksByClientLoad() { assertThat(assignments.get(processId(5)).tasks().keySet(), equalTo(mkSet(TASK_0_0))); } + @ParameterizedTest + @ValueSource(strings = { + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY, + }) + public void shouldNotViolateClientTagsAssignmentDuringStandbyOptimization(final String strategy) { + final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs( + strategy, 100, 1, 2, Collections.singletonList("az")); + final Map tasks = mkMap( + mkTaskInfo(TASK_0_0, true, mkSet("r1")), + mkTaskInfo(TASK_0_1, true, mkSet("r1")) + ); + final Map kafkaStreamsStates = mkMap( + mkStreamState(1, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap( + mkEntry("az", "1") + )), + mkStreamState(2, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap( + mkEntry("az", "2") + )), + mkStreamState(3, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap( + mkEntry("az", "3") + )), + mkStreamState(4, 2, Optional.of("r1"), mkSet(), mkSet(), mkMap( + mkEntry("az", "2") + )) + ); + final ApplicationState applicationState = new TestApplicationState( + assignmentConfigs, kafkaStreamsStates, tasks); + + final Map assignments = mkMap( + mkAssignment( + 1, + new AssignedTask(TASK_0_0, AssignedTask.Type.ACTIVE), + new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY) + ), + mkAssignment( + 2, + new AssignedTask(TASK_0_0, AssignedTask.Type.STANDBY), + new AssignedTask(TASK_0_1, AssignedTask.Type.ACTIVE) + ), + mkAssignment( + 3, + new AssignedTask(TASK_0_0, AssignedTask.Type.STANDBY), + new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY) + ), + mkAssignment(4) + ); + + TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState), assignments); + assertThat(assignments.size(), equalTo(4)); + assertThat(assignments.get(processId(1)).tasks().keySet(), equalTo(mkSet(TASK_0_0, TASK_0_1))); + assertThat(assignments.get(processId(2)).tasks().keySet(), equalTo(mkSet(TASK_0_0, TASK_0_1))); + assertThat(assignments.get(processId(3)).tasks().keySet(), equalTo(mkSet(TASK_0_0, TASK_0_1))); + assertThat(assignments.get(processId(4)).tasks().keySet(), equalTo(mkSet())); + } + + @ParameterizedTest + @ValueSource(strings = { + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY, + }) + public void shouldOptimizeStandbyTasksWithMultipleRacks(final String strategy) { + final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs( + strategy, 100, 1, 1, Collections.emptyList()); + final Map tasks = mkMap( + mkTaskInfo(TASK_0_0, true, mkSet("rack-1", "rack-2")), + mkTaskInfo(TASK_0_1, true, mkSet("rack-2", "rack-3")), + mkTaskInfo(TASK_0_2, true, mkSet("rack-3", "rack-4")) + ); + final Map kafkaStreamsStates = mkMap( + mkStreamState(1, 2, Optional.of("rack-1")), + mkStreamState(2, 2, Optional.of("rack-2")), + mkStreamState(3, 2, Optional.of("rack-3")) + ); + final ApplicationState applicationState = new TestApplicationState( + assignmentConfigs, kafkaStreamsStates, tasks); + + final Map assignments = mkMap( + mkAssignment(AssignedTask.Type.ACTIVE, 1, TASK_0_0), + mkAssignment(AssignedTask.Type.ACTIVE, 2, TASK_0_1), + mkAssignment(AssignedTask.Type.ACTIVE, 3, TASK_0_2) + ); + + TaskAssignmentUtils.optimizeRackAwareActiveTasks( + RackAwareOptimizationParams.of(applicationState) + .forTasks(new TreeSet<>(mkSet(TASK_0_0, TASK_0_1, TASK_0_2))), + assignments + ); + assertThat(assignments.size(), equalTo(3)); + assertThat(assignments.get(processId(1)).tasks().keySet(), equalTo(mkSet(TASK_0_0))); + assertThat(assignments.get(processId(2)).tasks().keySet(), equalTo(mkSet(TASK_0_1))); + assertThat(assignments.get(processId(3)).tasks().keySet(), equalTo(mkSet(TASK_0_2))); + } + + @Test + public void shouldCorrectlyReturnIdentityAssignment() { + final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs( + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 100, 1, 1, Collections.emptyList()); + final Map tasks = mkMap( + mkTaskInfo(TASK_0_0, true), + mkTaskInfo(TASK_0_1, true), + mkTaskInfo(TASK_0_2, true) + ); + final Map kafkaStreamsStates = mkMap( + mkStreamState(1, 5, Optional.empty(), mkSet(TASK_0_0, TASK_0_1, TASK_0_2), mkSet()), + mkStreamState(2, 5, Optional.empty(), mkSet(), mkSet(TASK_0_0, TASK_0_1, TASK_0_2)), + mkStreamState(3, 5, Optional.empty(), mkSet(), mkSet()), + mkStreamState(4, 5, Optional.empty(), mkSet(), mkSet()), + mkStreamState(5, 5, Optional.empty(), mkSet(), mkSet()) + ); + final ApplicationState applicationState = new TestApplicationState( + assignmentConfigs, kafkaStreamsStates, tasks); + + + final Map assignments = TaskAssignmentUtils.identityAssignment(applicationState); + assertThat(assignments.size(), equalTo(5)); + assertThat(assignments.get(processId(1)).tasks().keySet(), equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2))); + assertThat(assignments.get(processId(2)).tasks().keySet(), equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2))); + assertThat(assignments.get(processId(3)).tasks().keySet(), equalTo(mkSet())); + assertThat(assignments.get(processId(4)).tasks().keySet(), equalTo(mkSet())); + assertThat(assignments.get(processId(5)).tasks().keySet(), equalTo(mkSet())); + } + + @Test + public void testValidateTaskAssignment() { + final AssignmentConfigs assignmentConfigs = defaultAssignmentConfigs( + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 100, 1, 1, Collections.emptyList()); + final Map tasks = mkMap( + mkTaskInfo(TASK_1_1, false) + ); + final Map kafkaStreamsStates = mkMap( + mkStreamState(1, 5, Optional.empty()), + mkStreamState(2, 5, Optional.empty()) + ); + final ApplicationState applicationState = new TestApplicationState( + assignmentConfigs, kafkaStreamsStates, tasks); + + // **** + final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment noError = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( + mkSet( + KafkaStreamsAssignment.of(processId(1), mkSet( + new KafkaStreamsAssignment.AssignedTask( + new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE + ) + )), + KafkaStreamsAssignment.of(processId(2), mkSet()) + ) + ); + org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError error = TaskAssignmentUtils.validateTaskAssignment(applicationState, noError); + assertThat(error, equalTo(TaskAssignor.AssignmentError.NONE)); + + // **** + final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment missingProcessId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( + mkSet( + KafkaStreamsAssignment.of(processId(1), mkSet( + new KafkaStreamsAssignment.AssignedTask( + new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE + ) + )) + ) + ); + error = TaskAssignmentUtils.validateTaskAssignment(applicationState, missingProcessId); + assertThat(error, equalTo(TaskAssignor.AssignmentError.MISSING_PROCESS_ID)); + + // **** + final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment unknownProcessId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( + mkSet( + KafkaStreamsAssignment.of(processId(1), mkSet( + new KafkaStreamsAssignment.AssignedTask( + new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE + ) + )), + KafkaStreamsAssignment.of(processId(2), mkSet()), + KafkaStreamsAssignment.of(ProcessId.randomProcessId(), mkSet()) + ) + ); + error = TaskAssignmentUtils.validateTaskAssignment(applicationState, unknownProcessId); + assertThat(error, equalTo(TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID)); + + // **** + final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment unknownTaskId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( + mkSet( + KafkaStreamsAssignment.of(processId(1), mkSet( + new KafkaStreamsAssignment.AssignedTask( + new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE + ) + )), + KafkaStreamsAssignment.of(processId(2), mkSet( + new KafkaStreamsAssignment.AssignedTask( + new TaskId(13, 13), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE + ) + )) + ) + ); + error = TaskAssignmentUtils.validateTaskAssignment(applicationState, unknownTaskId); + assertThat(error, equalTo(TaskAssignor.AssignmentError.UNKNOWN_TASK_ID)); + + // **** + final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment activeTaskDuplicated = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment( + mkSet( + KafkaStreamsAssignment.of(processId(1), mkSet( + new KafkaStreamsAssignment.AssignedTask( + new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE + ) + )), + KafkaStreamsAssignment.of(processId(2), mkSet( + new KafkaStreamsAssignment.AssignedTask( + new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE + ) + )) + ) + ); + error = TaskAssignmentUtils.validateTaskAssignment(applicationState, activeTaskDuplicated); + assertThat(error, equalTo(TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES)); + } + public static class TestApplicationState implements ApplicationState { private final AssignmentConfigs assignmentConfigs; @@ -293,6 +512,18 @@ public static Map.Entry mkAssignment(final As ); } + public static Map.Entry mkAssignment(final int client, + final AssignedTask... tasks) { + final ProcessId processId = processId(client); + return mkEntry( + processId, + KafkaStreamsAssignment.of( + processId, + Arrays.stream(tasks).collect(Collectors.toSet()) + ) + ); + } + public static Map.Entry mkTaskInfo(final TaskId taskId, final boolean isStateful) { return mkTaskInfo(taskId, isStateful, null); }