Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.util.OptionalInt;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;

/**
* Assignment related configs for the Kafka Streams {@link TaskAssignor}.
Expand All @@ -43,26 +41,8 @@ public static AssignmentConfigs of(final StreamsConfig configs) {
final long probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
final List<String> rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
final String rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
Integer rackAwareTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
Integer rackAwareNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);

final String assignorClassName = configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
if (rackAwareTrafficCost == null) {
rackAwareTrafficCost = StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST;
}
if (rackAwareNonOverlapCost == null) {
rackAwareNonOverlapCost = StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST;
}
} else if (HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
// TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor class once it implements the new TaskAssignor interface
if (rackAwareTrafficCost == null) {
rackAwareTrafficCost = HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST;
}
if (rackAwareNonOverlapCost == null) {
rackAwareNonOverlapCost = HighAvailabilityTaskAssignor.DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST;
}
}
final Integer rackAwareTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
final Integer rackAwareNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);

return new AssignmentConfigs(
acceptableRecoveryLag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.Collections.unmodifiableMap;

import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -108,6 +109,16 @@ public Optional<Instant> followupRebalanceDeadline() {
return followupRebalanceDeadline;
}

@Override
public String toString() {
return String.format(
"KafkaStreamsAssignment{%s, %s, %s}",
processId,
Arrays.toString(tasks.values().toArray(new AssignedTask[0])),
followupRebalanceDeadline
);
}

public static class AssignedTask {
private final TaskId id;
private final Type taskType;
Expand Down Expand Up @@ -157,5 +168,10 @@ public boolean equals(final Object obj) {
final AssignedTask other = (AssignedTask) obj;
return this.id.equals(other.id()) && this.taskType == other.taskType;
}

@Override
public String toString() {
return String.format("AssignedTask{%s, %s}", taskType, id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ public UUID id() {
return id;
}

/**
*
* @return a randomly generated process id
*/
public static ProcessId randomProcessId() {
return new ProcessId(UUID.randomUUID());
}

@Override
public String toString() {
return id.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ private TaskAssignmentUtils() {}
* A simple config container for necessary parameters and optional overrides to apply when
* running the active or standby task rack-aware optimizations.
*/
public static class RackAwareOptimizationParams {
public static final class RackAwareOptimizationParams {
private final ApplicationState applicationState;
private final Optional<Integer> trafficCostOverride;
private final Optional<Integer> nonOverlapCostOverride;
private final Optional<SortedSet<TaskId>> tasksToOptimize;

private RackAwareOptimizationParams(final ApplicationState applicationState,
final Optional<Integer> trafficCostOverride,
final Optional<Integer> nonOverlapCostOverride,
final Optional<SortedSet<TaskId>> tasksToOptimize) {
final Optional<Integer> trafficCostOverride,
final Optional<Integer> nonOverlapCostOverride,
final Optional<SortedSet<TaskId>> tasksToOptimize) {
this.applicationState = applicationState;
this.trafficCostOverride = trafficCostOverride;
this.nonOverlapCostOverride = nonOverlapCostOverride;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,15 @@ private void optimizeActive(final ApplicationState applicationState,

final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.newAssignments;

TaskAssignmentUtils.optimizeRackAwareActiveTasks(
RackAwareOptimizationParams.of(applicationState).forStatefulTasks(),
currentAssignments
);
final RackAwareOptimizationParams statefulTaskParams = RackAwareOptimizationParams.of(applicationState)
.withTrafficCostOverride(
applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST)
)
.withNonOverlapCostOverride(
applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST)
)
.forStatefulTasks();
TaskAssignmentUtils.optimizeRackAwareActiveTasks(statefulTaskParams, currentAssignments);

TaskAssignmentUtils.optimizeRackAwareActiveTasks(
RackAwareOptimizationParams.of(applicationState)
Expand All @@ -120,7 +125,15 @@ private void optimizeStandby(final ApplicationState applicationState, final Assi
}

final Map<ProcessId, KafkaStreamsAssignment> assignments = assignmentState.newAssignments;
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState), assignments);

final RackAwareOptimizationParams optimizationParams = RackAwareOptimizationParams.of(applicationState)
.withTrafficCostOverride(
applicationState.assignmentConfigs().rackAwareTrafficCost().orElse(DEFAULT_STICKY_TRAFFIC_COST)
)
.withNonOverlapCostOverride(
applicationState.assignmentConfigs().rackAwareNonOverlapCost().orElse(DEFAULT_STICKY_NON_OVERLAP_COST)
);
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(optimizationParams, assignments);
assignmentState.processOptimizedAssignments(assignments);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@

import static java.util.Collections.unmodifiableSet;
import static java.util.Map.Entry.comparingByKey;
import static java.util.UUID.randomUUID;
import static org.apache.kafka.common.utils.Utils.filterMap;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsResult;
Expand Down Expand Up @@ -197,7 +196,7 @@ public interface UserTaskAssignmentListener {
}

// keep track of any future consumers in a "dummy" Client since we can't decipher their subscription
private static final ProcessId FUTURE_ID = new ProcessId(randomUUID());
private static final ProcessId FUTURE_ID = ProcessId.randomProcessId();

protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -60,20 +59,12 @@
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.DefaultApplicationState;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
Expand Down Expand Up @@ -2616,123 +2607,6 @@ public void testClientTags() {
assertEquals(clientTags, partitionAssignor.clientTags());
}

@Test
public void testValidateTaskAssignment() {
createDefaultMockTaskManager();
configureDefaultPartitionAssignor();

final StreamsConfig streamsConfig = new StreamsConfig(configProps());
final AssignmentConfigs assignmentConfigs = AssignmentConfigs.of(streamsConfig);
final Set<TaskInfo> tasks = mkSet(
new DefaultTaskInfo(
new TaskId(1, 1),
false,
mkSet(),
mkSet(
new DefaultTaskTopicPartition(
new TopicPartition("t1", 1),
true,
false,
() -> { }
)
)
)
);

final ProcessId clientUuid1 = new ProcessId(UUID.randomUUID());
final ProcessId clientUuid2 = new ProcessId(UUID.randomUUID());
final Map<ProcessId, StreamsPartitionAssignor.ClientMetadata> clients = mkMap(
mkEntry(clientUuid1, new StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint1:80", mkMap(), Optional.empty())),
mkEntry(clientUuid2, new StreamsPartitionAssignor.ClientMetadata(clientUuid1, "endpoint2:80", mkMap(), Optional.empty()))
);
final ApplicationState applicationState = new DefaultApplicationState(
assignmentConfigs,
tasks.stream().collect(Collectors.toMap(
TaskInfo::id,
t -> t
)),
clients
);

// ****
final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment noError = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
mkSet(
KafkaStreamsAssignment.of(clientUuid1, mkSet(
new KafkaStreamsAssignment.AssignedTask(
new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
)
)),
KafkaStreamsAssignment.of(clientUuid2, mkSet())
)
);
org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError error = TaskAssignmentUtils.validateTaskAssignment(applicationState, noError);
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.NONE, error);

// ****
final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment missingProcessId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
mkSet(
KafkaStreamsAssignment.of(clientUuid1, mkSet(
new KafkaStreamsAssignment.AssignedTask(
new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
)
))
)
);
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, missingProcessId);
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.MISSING_PROCESS_ID, error);

// ****
final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment unknownProcessId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
mkSet(
KafkaStreamsAssignment.of(clientUuid1, mkSet(
new KafkaStreamsAssignment.AssignedTask(
new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
)
)),
KafkaStreamsAssignment.of(clientUuid2, mkSet()),
KafkaStreamsAssignment.of(new ProcessId(UUID.randomUUID()), mkSet())
)
);
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, unknownProcessId);
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID, error);

// ****
final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment unknownTaskId = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
mkSet(
KafkaStreamsAssignment.of(clientUuid1, mkSet(
new KafkaStreamsAssignment.AssignedTask(
new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
)
)),
KafkaStreamsAssignment.of(clientUuid2, mkSet(
new KafkaStreamsAssignment.AssignedTask(
new TaskId(13, 13), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
)
))
)
);
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, unknownTaskId);
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.UNKNOWN_TASK_ID, error);

// ****
final org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment activeTaskDuplicated = new org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment(
mkSet(
KafkaStreamsAssignment.of(clientUuid1, mkSet(
new KafkaStreamsAssignment.AssignedTask(
new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
)
)),
KafkaStreamsAssignment.of(clientUuid2, mkSet(
new KafkaStreamsAssignment.AssignedTask(
new TaskId(1, 1), KafkaStreamsAssignment.AssignedTask.Type.ACTIVE
)
))
)
);
error = TaskAssignmentUtils.validateTaskAssignment(applicationState, activeTaskDuplicated);
assertEquals(org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES, error);
}

private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder {
private Map<Subtopology, TopicsInfo> corruptedTopicGroups;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.UUID;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
Expand Down Expand Up @@ -240,7 +239,7 @@ private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TaskManager taskManager = new TaskManager(
time,
changeLogReader,
new ProcessId(UUID.randomUUID()),
ProcessId.randomProcessId(),
"taskManagerTest",
activeTaskCreator,
standbyTaskCreator,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;

import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.processIdForInt;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
import org.junit.Test;

public class KafkaStreamsAssignmentTest {
@Test
public void shouldHaveReadableString() {
final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of(
processIdForInt(1),
mkSet(
new AssignedTask(TASK_0_0, AssignedTask.Type.ACTIVE),
new AssignedTask(TASK_0_1, AssignedTask.Type.STANDBY),
new AssignedTask(TASK_0_2, AssignedTask.Type.ACTIVE)
)
);

assertThat(
assignment.toString(),
equalTo("KafkaStreamsAssignment{00000000-0000-0000-0000-000000000001, "
+ "[AssignedTask{ACTIVE, 0_2}, AssignedTask{STANDBY, 0_1}, AssignedTask{ACTIVE, 0_0}], "
+ "Optional.empty}"));
}
}
Loading