Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -224,7 +224,7 @@ public interface UserTaskAssignmentListener {

private Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>>
customTaskAssignorSupplier;
private Supplier<TaskAssignor> internalTaskAssignorSupplier;
private Supplier<LegacyTaskAssignor> legacyTaskAssignorSupplier;
private byte uniqueField;
private Map<String, String> clientTags;

Expand Down Expand Up @@ -259,7 +259,7 @@ public void configure(final Map<String, ?> configs) {
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor;
internalTaskAssignorSupplier = assignorConfiguration::taskAssignor;
legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
uniqueField = 0;
clientTags = referenceContainer.clientTags;
Expand Down Expand Up @@ -817,7 +817,7 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada
};
} else {
customTaskAssignmentListener = (assignment, subscription) -> { };
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
final LegacyTaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
fullMetadata,
partitionsForTask,
Expand Down Expand Up @@ -859,8 +859,8 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada
return customTaskAssignmentListener;
}

private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
final TaskAssignor taskAssignor = internalTaskAssignorSupplier.get();
private LegacyTaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
final LegacyTaskAssignor taskAssignor = legacyTaskAssignorSupplier.get();
if (taskAssignor instanceof StickyTaskAssignor) {
// special case: to preserve pre-existing behavior, we invoke the StickyTaskAssignor
// whether or not lag computation failed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ public AssignmentConfigs assignmentConfigs() {
return AssignmentConfigs.of(streamsConfig);
}

public TaskAssignor taskAssignor() {
public LegacyTaskAssignor taskAssignor() {
try {
return Utils.newInstance(internalTaskAssignorClass, TaskAssignor.class);
return Utils.newInstance(internalTaskAssignorClass, LegacyTaskAssignor.class);
} catch (final ClassNotFoundException e) {
throw new IllegalArgumentException(
"Expected an instantiable class name for " + INTERNAL_TASK_ASSIGNOR_CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* 1. ignore the task lags in the ClientState map
* 2. always return true, indicating that a follow-up rebalance is needed
*/
public class FallbackPriorTaskAssignor implements TaskAssignor {
public class FallbackPriorTaskAssignor implements LegacyTaskAssignor {
private final StickyTaskAssignor delegate;

public FallbackPriorTaskAssignor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignStandbyTaskMovements;

public class HighAvailabilityTaskAssignor implements TaskAssignor {
public class HighAvailabilityTaskAssignor implements LegacyTaskAssignor {
private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
public static final int DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST = 10;
public static final int DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;

public interface TaskAssignor {
public interface LegacyTaskAssignor {
/**
* @return whether the generated assignment requires a followup probing rebalance to satisfy all conditions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;

interface StandbyTaskAssignor extends TaskAssignor {
interface StandbyTaskAssignor extends LegacyTaskAssignor {
default boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Objects;
import java.util.Set;

public class StickyTaskAssignor implements TaskAssignor {
public class StickyTaskAssignor implements LegacyTaskAssignor {

private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
Expand Down Expand Up @@ -78,7 +78,8 @@ public static void closeCluster() {
public TestName testName = new TestName();

// Just a dummy implementation so we can check the config
public static final class MyTaskAssignor extends HighAvailabilityTaskAssignor implements TaskAssignor { }
public static final class MyLegacyTaskAssignor extends HighAvailabilityTaskAssignor implements
LegacyTaskAssignor { }

@SuppressWarnings("unchecked")
@Test
Expand Down Expand Up @@ -116,7 +117,7 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, MyTaskAssignor.class.getName())
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, MyLegacyTaskAssignor.class.getName())
)
);

Expand Down Expand Up @@ -153,18 +154,18 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il
assignmentListenerField.setAccessible(true);
final AssignmentListener actualAssignmentListener = (AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);

final Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("internalTaskAssignorSupplier");
final Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("legacyTaskAssignorSupplier");
taskAssignorSupplierField.setAccessible(true);
final Supplier<TaskAssignor> taskAssignorSupplier =
(Supplier<TaskAssignor>) taskAssignorSupplierField.get(streamsPartitionAssignor);
final TaskAssignor taskAssignor = taskAssignorSupplier.get();
final Supplier<LegacyTaskAssignor> taskAssignorSupplier =
(Supplier<LegacyTaskAssignor>) taskAssignorSupplierField.get(streamsPartitionAssignor);
final LegacyTaskAssignor taskAssignor = taskAssignorSupplier.get();

assertThat(configs.numStandbyReplicas(), is(5));
assertThat(configs.acceptableRecoveryLag(), is(6L));
assertThat(configs.maxWarmupReplicas(), is(7));
assertThat(configs.probingRebalanceIntervalMs(), is(480000L));
assertThat(actualAssignmentListener, sameInstance(configuredAssignmentListener));
assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
assertThat(taskAssignor, instanceOf(MyLegacyTaskAssignor.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
Expand Down Expand Up @@ -150,7 +150,7 @@ private void completeLargeAssignment(final int numPartitions,
final int numClients,
final int numThreadsPerClient,
final int numStandbys,
final Class<? extends TaskAssignor> taskAssignor) {
final Class<? extends LegacyTaskAssignor> taskAssignor) {
final List<String> topic = singletonList("topic");

final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
Expand Down Expand Up @@ -237,7 +237,7 @@ public class StreamsPartitionAssignorTest {
@Captor
private ArgumentCaptor<Map<TopicPartition, PartitionInfo>> topicPartitionInfoCaptor;
private final Map<String, Subscription> subscriptions = new HashMap<>();
private final Class<? extends TaskAssignor> internalTaskAssignor;
private final Class<? extends LegacyTaskAssignor> internalTaskAssignor;
private final Class<? extends org.apache.kafka.streams.processor.assignment.TaskAssignor> customTaskAssignor;
private final String rackAwareAssignorStrategy;
private Map<String, String> clientTags;
Expand Down Expand Up @@ -354,7 +354,7 @@ public static Collection<Object[]> parameters() {
);
}

public StreamsPartitionAssignorTest(final Class<? extends TaskAssignor> internalTaskAssignor,
public StreamsPartitionAssignorTest(final Class<? extends LegacyTaskAssignor> internalTaskAssignor,
final boolean enableRackAwareAssignor,
final Class<? extends org.apache.kafka.streams.processor.assignment.TaskAssignor> customTaskAssignor) {
this.internalTaskAssignor = internalTaskAssignor;
Expand Down