From 277c805fbcb092277e4aa6574cb4999490bafac5 Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Wed, 12 Jun 2024 20:26:18 -0600 Subject: [PATCH] KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to LegacyTaskAssignor Since the new public API for TaskAssignor shared a name, this rename will prevent users from confusing the internal definition with the public one. --- .../internals/StreamsPartitionAssignor.java | 12 ++++++------ .../assignment/AssignorConfiguration.java | 4 ++-- .../assignment/FallbackPriorTaskAssignor.java | 2 +- .../HighAvailabilityTaskAssignor.java | 2 +- ...askAssignor.java => LegacyTaskAssignor.java} | 2 +- .../assignment/StandbyTaskAssignor.java | 2 +- .../assignment/StickyTaskAssignor.java | 2 +- .../TaskAssignorIntegrationTest.java | 17 +++++++++-------- .../internals/StreamsAssignmentScaleTest.java | 4 ++-- .../internals/StreamsPartitionAssignorTest.java | 6 +++--- 10 files changed, 27 insertions(+), 26 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/{TaskAssignor.java => LegacyTaskAssignor.java} (97%) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 887ef86faf54b..e0ddd0883776a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -64,7 +64,7 @@ import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; -import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo; import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; @@ -224,7 +224,7 @@ public interface UserTaskAssignmentListener { private Supplier> customTaskAssignorSupplier; - private Supplier internalTaskAssignorSupplier; + private Supplier legacyTaskAssignorSupplier; private byte uniqueField; private Map clientTags; @@ -259,7 +259,7 @@ public void configure(final Map configs) { copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer(); rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor; - internalTaskAssignorSupplier = assignorConfiguration::taskAssignor; + legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor; assignmentListener = assignorConfiguration.assignmentListener(); uniqueField = 0; clientTags = referenceContainer.clientTags; @@ -817,7 +817,7 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada }; } else { customTaskAssignmentListener = (assignment, subscription) -> { }; - final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); + final LegacyTaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( fullMetadata, partitionsForTask, @@ -859,8 +859,8 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada return customTaskAssignmentListener; } - private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) { - final TaskAssignor taskAssignor = internalTaskAssignorSupplier.get(); + private LegacyTaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) { + final LegacyTaskAssignor taskAssignor = legacyTaskAssignorSupplier.get(); if (taskAssignor instanceof StickyTaskAssignor) { // special case: to preserve pre-existing behavior, we invoke the StickyTaskAssignor // whether or not lag computation failed. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 1971642942ab8..311863630af42 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -242,9 +242,9 @@ public AssignmentConfigs assignmentConfigs() { return AssignmentConfigs.of(streamsConfig); } - public TaskAssignor taskAssignor() { + public LegacyTaskAssignor taskAssignor() { try { - return Utils.newInstance(internalTaskAssignorClass, TaskAssignor.class); + return Utils.newInstance(internalTaskAssignorClass, LegacyTaskAssignor.class); } catch (final ClassNotFoundException e) { throw new IllegalArgumentException( "Expected an instantiable class name for " + INTERNAL_TASK_ASSIGNOR_CLASS, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java index 93dcd6192b603..d7a07c6e2006b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java @@ -31,7 +31,7 @@ * 1. ignore the task lags in the ClientState map * 2. always return true, indicating that a follow-up rebalance is needed */ -public class FallbackPriorTaskAssignor implements TaskAssignor { +public class FallbackPriorTaskAssignor implements LegacyTaskAssignor { private final StickyTaskAssignor delegate; public FallbackPriorTaskAssignor() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java index bda199ea33512..272cd824b4354 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java @@ -43,7 +43,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements; import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignStandbyTaskMovements; -public class HighAvailabilityTaskAssignor implements TaskAssignor { +public class HighAvailabilityTaskAssignor implements LegacyTaskAssignor { private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class); public static final int DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST = 10; public static final int DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST = 1; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java similarity index 97% rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java index e01c13b59dadc..9a7485b64aa70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/LegacyTaskAssignor.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.assignment.ProcessId; -public interface TaskAssignor { +public interface LegacyTaskAssignor { /** * @return whether the generated assignment requires a followup probing rebalance to satisfy all conditions */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java index 022eaae078c72..4787ac6552339 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.assignment.ProcessId; -interface StandbyTaskAssignor extends TaskAssignor { +interface StandbyTaskAssignor extends LegacyTaskAssignor { default boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { return true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index b6953613d893c..94c5dfce1847c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -40,7 +40,7 @@ import java.util.Objects; import java.util.Set; -public class StickyTaskAssignor implements TaskAssignor { +public class StickyTaskAssignor implements LegacyTaskAssignor { private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java index 9d7d298f831dc..3e29773a151d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener; import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; -import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.AfterClass; @@ -78,7 +78,8 @@ public static void closeCluster() { public TestName testName = new TestName(); // Just a dummy implementation so we can check the config - public static final class MyTaskAssignor extends HighAvailabilityTaskAssignor implements TaskAssignor { } + public static final class MyLegacyTaskAssignor extends HighAvailabilityTaskAssignor implements + LegacyTaskAssignor { } @SuppressWarnings("unchecked") @Test @@ -116,7 +117,7 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"), mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"), mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener), - mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, MyTaskAssignor.class.getName()) + mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, MyLegacyTaskAssignor.class.getName()) ) ); @@ -153,18 +154,18 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il assignmentListenerField.setAccessible(true); final AssignmentListener actualAssignmentListener = (AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor); - final Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("internalTaskAssignorSupplier"); + final Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("legacyTaskAssignorSupplier"); taskAssignorSupplierField.setAccessible(true); - final Supplier taskAssignorSupplier = - (Supplier) taskAssignorSupplierField.get(streamsPartitionAssignor); - final TaskAssignor taskAssignor = taskAssignorSupplier.get(); + final Supplier taskAssignorSupplier = + (Supplier) taskAssignorSupplierField.get(streamsPartitionAssignor); + final LegacyTaskAssignor taskAssignor = taskAssignorSupplier.get(); assertThat(configs.numStandbyReplicas(), is(5)); assertThat(configs.acceptableRecoveryLag(), is(6L)); assertThat(configs.maxWarmupReplicas(), is(7)); assertThat(configs.probingRebalanceIntervalMs(), is(480000L)); assertThat(actualAssignmentListener, sameInstance(configuredAssignmentListener)); - assertThat(taskAssignor, instanceOf(MyTaskAssignor.class)); + assertThat(taskAssignor, instanceOf(MyLegacyTaskAssignor.class)); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java index b3c7131341bf7..c782eb3dbf68c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java @@ -36,7 +36,7 @@ import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; -import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockClientSupplier; @@ -150,7 +150,7 @@ private void completeLargeAssignment(final int numPartitions, final int numClients, final int numThreadsPerClient, final int numStandbys, - final Class taskAssignor) { + final Class taskAssignor) { final List topic = singletonList("topic"); final Map changelogEndOffsets = new HashMap<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 43c07ee3c2811..ac5b318008136 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -70,7 +70,7 @@ import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; -import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockClientSupplier; @@ -237,7 +237,7 @@ public class StreamsPartitionAssignorTest { @Captor private ArgumentCaptor> topicPartitionInfoCaptor; private final Map subscriptions = new HashMap<>(); - private final Class internalTaskAssignor; + private final Class internalTaskAssignor; private final Class customTaskAssignor; private final String rackAwareAssignorStrategy; private Map clientTags; @@ -354,7 +354,7 @@ public static Collection parameters() { ); } - public StreamsPartitionAssignorTest(final Class internalTaskAssignor, + public StreamsPartitionAssignorTest(final Class internalTaskAssignor, final boolean enableRackAwareAssignor, final Class customTaskAssignor) { this.internalTaskAssignor = internalTaskAssignor;