From c937bfb6e4ff1a3f1ebb558ba7a9aad2baf9fdd4 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 11 Mar 2020 15:38:26 -0700 Subject: [PATCH 1/7] get admin from assignor config and add offset sums to ClientState fetch end offsets add lag to each clienttate build ranks add ClientState tests add tests for building client rankings fall back to prev assignment if fetching offsets fails --- .../org/apache/kafka/clients/admin/Admin.java | 2 +- .../org/apache/kafka/common/utils/Utils.java | 20 +- .../apache/kafka/streams/KafkaStreams.java | 23 ++- .../internals/StreamsPartitionAssignor.java | 195 ++++++++++++++---- .../assignment/AssignorConfiguration.java | 8 +- .../internals/assignment/ClientState.java | 76 ++++++- .../assignment/SubscriptionInfo.java | 4 +- .../kafka/streams/KafkaStreamsTest.java | 47 +++++ .../StreamsPartitionAssignorTest.java | 152 +++++++++++++- .../internals/assignment/ClientStateTest.java | 187 +++++++++++------ 10 files changed, 590 insertions(+), 124 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index b99fd5ff58c39..f10679bd4dc63 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1119,7 +1119,7 @@ default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up. * @return The ListOffsetsResult. */ - default ListOffsetsResult listOffsets(Map topicPartitionOffsets) { + default ListOffsetsResult listOffsets(Map topicPartitionOffsets) { return listOffsets(topicPartitionOffsets, new ListOffsetsOptions()); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index e9d4cc40eee34..52ad3db8c5544 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.utils; +import static java.util.Arrays.asList; + +import java.util.SortedSet; +import java.util.TreeSet; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -660,7 +664,7 @@ public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength return existingBuffer; } - /* + /** * Creates a set * @param elems the elements * @param the type of element @@ -674,6 +678,20 @@ public static Set mkSet(T... elems) { return result; } + /** + * Creates a sorted set + * @param elems the elements + * @param the type of element, must be comparable + * @return SortedSet + */ + @SafeVarargs + public static > SortedSet mkSortedSet(T... elems) { + SortedSet result = new TreeSet<>(); + for (T elem : elems) + result.add(elem); + return result; + } + /** * Creates a map entry (for use with {@link Utils#mkMap(java.util.Map.Entry[])}) * diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2807e62e1b7cc..48681802da203 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1217,17 +1217,9 @@ public Map> allLocalStorePartitionLags() { } log.debug("Current changelog positions: {}", allChangelogPositions); - final Map allEndOffsets; - try { - allEndOffsets = adminClient.listOffsets( - allPartitions.stream() - .collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())) - ).all().get(); - } catch (final RuntimeException | InterruptedException | ExecutionException e) { - throw new StreamsException("Unable to obtain end offsets from kafka", e); - } - + final Map allEndOffsets = fetchEndOffsets(allPartitions, adminClient); log.debug("Current end offsets :{}", allEndOffsets); + for (final Map.Entry entry : allEndOffsets.entrySet()) { // Avoiding an extra admin API lookup by computing lags for not-yet-started restorations // from zero instead of the real "earliest offset" for the changelog. @@ -1244,4 +1236,15 @@ public Map> allLocalStorePartitionLags() { return Collections.unmodifiableMap(localStorePartitionLags); } + + public static Map fetchEndOffsets(final Collection partitions, + final Admin adminClient) { + try { + return adminClient.listOffsets( + partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())) + ).all().get(); + } catch (final RuntimeException | InterruptedException | ExecutionException e) { + throw new StreamsException("Unable to obtain end offsets from kafka", e); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2740deed8746a..6d89b50a64896 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -17,6 +17,10 @@ package org.apache.kafka.streams.processor.internals; import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.common.Cluster; @@ -30,6 +34,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; @@ -60,6 +65,7 @@ import java.util.stream.Collectors; import static java.util.UUID.randomUUID; +import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsets; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; @@ -124,9 +130,8 @@ void addConsumer(final String consumerMemberId, final List owned state.addOwnedPartitions(ownedPartitions, consumerMemberId); } - void addPreviousTasks(final SubscriptionInfo info) { - state.addPreviousActiveTasks(info.prevTasks()); - state.addPreviousStandbyTasks(info.standbyTasks()); + void addPreviousTasksAndOffsetSums(final Map taskOffsetSums) { + state.addPreviousTasksAndOffsetSums(taskOffsetSums); } @Override @@ -203,6 +208,7 @@ public int hashCode() { protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION; + private Admin adminClient; private InternalTopicManager internalTopicManager; private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer; private RebalanceProtocol rebalanceProtocol; @@ -228,6 +234,7 @@ public void configure(final Map configs) { assignmentConfigs = assignorConfiguration.getAssignmentConfigs(); partitionGrouper = assignorConfiguration.getPartitionGrouper(); userEndPoint = assignorConfiguration.getUserEndPoint(); + adminClient = assignorConfiguration.getAdminClient(); internalTopicManager = assignorConfiguration.getInternalTopicManager(); copartitionedTopicsEnforcer = assignorConfiguration.getCopartitionedTopicsEnforcer(); rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); @@ -350,7 +357,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // add the consumer and any info in its subscription to the client clientMetadata.addConsumer(consumerId, subscription.ownedPartitions()); allOwnedPartitions.addAll(subscription.ownedPartitions()); - clientMetadata.addPreviousTasks(info); + clientMetadata.addPreviousTasksAndOffsetSums(info.taskOffsetSums()); } final boolean versionProbing = @@ -363,7 +370,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // parse the topology to determine the repartition source topics, // making sure they are created with the number of partitions as // the maximum of the depending sub-topologies source topics' number of partitions - final Map topicGroups = taskManager.builder().topicGroups(); + final Map topicGroups = taskManager.builder().topicGroups(); final Map allRepartitionTopicPartitions; try { @@ -385,7 +392,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr final Set allSourceTopics = new HashSet<>(); final Map> sourceTopicsByGroup = new HashMap<>(); - for (final Map.Entry entry : topicGroups.entrySet()) { + for (final Map.Entry entry : topicGroups.entrySet()) { allSourceTopics.addAll(entry.getValue().sourceTopics); sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics); } @@ -482,10 +489,10 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, * @return a map of repartition topics and their metadata * @throws TaskAssignmentException if there is incomplete source topic metadata due to missing source topic(s) */ - private Map computeRepartitionTopicMetadata(final Map topicGroups, + private Map computeRepartitionTopicMetadata(final Map topicGroups, final Cluster metadata) throws TaskAssignmentException { final Map repartitionTopicMetadata = new HashMap<>(); - for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { + for (final TopicsInfo topicsInfo : topicGroups.values()) { for (final String topic : topicsInfo.sourceTopics) { if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) && !metadata.topics().contains(topic)) { @@ -507,7 +514,7 @@ private Map computeRepartitionTopicMetadata(final M * * @return map from repartition topic to its partition info */ - private Map prepareRepartitionTopics(final Map topicGroups, + private Map prepareRepartitionTopics(final Map topicGroups, final Cluster metadata) { final Map repartitionTopicMetadata = computeRepartitionTopicMetadata(topicGroups, metadata); @@ -543,13 +550,13 @@ private Map prepareRepartitionTopics(final Map repartitionTopicMetadata, - final Map topicGroups, + final Map topicGroups, final Cluster metadata) { boolean numPartitionsNeeded; do { numPartitionsNeeded = false; - for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { + for (final TopicsInfo topicsInfo : topicGroups.values()) { for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { final Optional maybeNumPartitions = repartitionTopicMetadata.get(topicName) .numberOfPartitions(); @@ -557,7 +564,7 @@ private void setRepartitionTopicMetadataNumberOfPartitions(final Map otherSinkTopics = otherTopicsInfo.sinkTopics; if (otherSinkTopics.contains(topicName)) { @@ -670,17 +677,17 @@ private void checkAllPartitions(final Set allSourceTopics, /** * Resolve changelog topic metadata and create them if necessary. * - * @return set of standby task ids (any task that is stateful and has logging enabled) + * @return mapping of stateful tasks to their set of changelog topics */ - private Set prepareChangelogTopics(final Map topicGroups, - final Map> tasksForTopicGroup) { - final Set standbyTaskIds = new HashSet<>(); + private Map> prepareChangelogTopics(final Map topicGroups, + final Map> tasksForTopicGroup) { + final Map> changelogsByStatefulTask = new HashMap<>(); // add tasks to state change log topic subscribers final Map changelogTopicMetadata = new HashMap<>(); - for (final Map.Entry entry : topicGroups.entrySet()) { + for (final Map.Entry entry : topicGroups.entrySet()) { final int topicGroupId = entry.getKey(); - final InternalTopologyBuilder.TopicsInfo topicsInfo = entry.getValue(); + final TopicsInfo topicsInfo = entry.getValue(); final Set topicGroupTasks = tasksForTopicGroup.get(topicGroupId); if (topicGroupTasks == null) { @@ -690,7 +697,15 @@ private Set prepareChangelogTopics(final Map new TopicPartition(topic, task.partition)) + .collect(Collectors.toSet())); + } for (final InternalTopicConfig topicConfig : topicsInfo.nonSourceChangelogTopics()) { // the expected number of partitions is the max value of TaskId.partition + 1 @@ -707,33 +722,88 @@ private Set prepareChangelogTopics(final Map allSourceTopics, final Map> partitionsForTask, - final Map topicGroups, + final Map topicGroups, final Map clientMetadataMap, final Cluster fullMetadata) { final Map taskForPartition = new HashMap<>(); final Map> tasksForTopicGroup = new HashMap<>(); populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata); - final Set standbyTaskIds = prepareChangelogTopics(topicGroups, tasksForTopicGroup); + final Map> changelogsByStatefulTask = + prepareChangelogTopics(topicGroups, tasksForTopicGroup); + + final Map clientStates = new HashMap<>(); + final boolean lagComputationSuccessful = + populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask); + + final Map>> statefulTasksToRankedCandidates = + buildClientRankingsByTask(changelogsByStatefulTask.keySet(), clientStates, assignmentConfigs.acceptableRecoveryLag); + + // assign tasks to clients + if (lagComputationSuccessful) { + log.debug("Assigning tasks {} to clients {} with number of replicas {}", + partitionsForTask.keySet(), clientStates, assignmentConfigs.numStandbyReplicas); + final StickyTaskAssignor taskAssignor = + new StickyTaskAssignor<>(clientStates, partitionsForTask.keySet(), statefulTasksToRankedCandidates.keySet()); + taskAssignor.assign(assignmentConfigs.numStandbyReplicas); + } else { + log.debug("Failed to fetch end offsets and compute task lags, will return tasks to previous owners then retry"); + // give tasks back to previous owners (based on prevActiveTasks and prevStandbyTasks), distribute any "unowned" tasks + //TODO-soph + } + + log.info("Assigned tasks to clients as {}{}.", + Utils.NL, clientStates.entrySet().stream().map(Map.Entry::toString).collect(Collectors.joining(Utils.NL))); + } + + /** + * Builds a map from client to state, and readies each ClientState for assignment by adding any missing prev tasks + * and computing the per-task overall lag based on the fetched end offsets for each changelog. + * + * @param clientStates a map from each client to its state, including offset lags. Populated by this method. + * @param clientMetadataMap a map from each client to its full metadata + * @param taskForPartition map from topic partition to its corresponding task + * @param changelogsByStatefulTask map from each stateful task to its set of changelog topic partitions + * + * @return whether we were able to successfully fetch the changelog end offsets and compute each client's lag + */ + private boolean populateClientStatesMap(final Map clientStates, + final Map clientMetadataMap, + final Map taskForPartition, + final Map> changelogsByStatefulTask) { + boolean fetchEndOffsetsSuccessful; + Map allTaskEndOffsetSums; + try { + final Collection allChangelogPartitions = changelogsByStatefulTask.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + final Map endOffsets = fetchEndOffsets(allChangelogPartitions, adminClient); + allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask); + + fetchEndOffsetsSuccessful = true; + } catch(final StreamsException e) { + allTaskEndOffsetSums = null; + fetchEndOffsetsSuccessful = false; + } - final Map states = new HashMap<>(); for (final Map.Entry entry : clientMetadataMap.entrySet()) { final UUID uuid = entry.getKey(); final ClientState state = entry.getValue().state; - states.put(uuid, state); - // there are two cases where we need to construct the prevTasks from the ownedPartitions: - // 1) COOPERATIVE clients on version 2.4-2.5 do not encode active tasks and rely on ownedPartitions instead + // there are three cases where we need to construct some or all of the prevTasks from the ownedPartitions: + // 1) COOPERATIVE clients on version 2.4-2.5 do not encode active tasks at all and rely on ownedPartitions // 2) future client during version probing, when we can't decode the future subscription info's prev tasks - if (!state.ownedPartitions().isEmpty() && (uuid == FUTURE_ID || state.prevActiveTasks().isEmpty())) { + // 3) stateless tasks are not encoded in the task lags, and must be figured out from the ownedPartitions + if (!state.ownedPartitions().isEmpty()) { final Set previousActiveTasks = new HashSet<>(); for (final Map.Entry partitionEntry : state.ownedPartitions().entrySet()) { final TopicPartition tp = partitionEntry.getKey(); @@ -746,18 +816,69 @@ private void assignTasksToClients(final Set allSourceTopics, } state.addPreviousActiveTasks(previousActiveTasks); } - } - log.debug("Assigning tasks {} to clients {} with number of replicas {}", - partitionsForTask.keySet(), states, assignmentConfigs.numStandbyReplicas); + if (fetchEndOffsetsSuccessful) { + state.computeTaskLags(allTaskEndOffsetSums); + } + clientStates.put(uuid, state); + } + return fetchEndOffsetsSuccessful; + } - // assign tasks to clients - final StickyTaskAssignor taskAssignor = - new StickyTaskAssignor<>(states, partitionsForTask.keySet(), standbyTaskIds); - taskAssignor.assign(assignmentConfigs.numStandbyReplicas); + /** + * @param endOffsets the listOffsets result from the adminClient, or null if the request failed + * @param changelogsByStatefulTask map from stateful task to its set of changelog topic partitions + * + * @return Map from stateful task to its total end offset summed across all changelog partitions + */ + private Map computeEndOffsetSumsByTask(final Map endOffsets, + final Map> changelogsByStatefulTask) { + final Map taskEndOffsetSums = new HashMap<>(); + for (final Map.Entry> taskEntry : changelogsByStatefulTask.entrySet()) { + final TaskId task = taskEntry.getKey(); + final Set changelogs = taskEntry.getValue(); + + taskEndOffsetSums.put(task, 0L); + for (final TopicPartition changelog : changelogs) { + final ListOffsetsResultInfo offsetResult = endOffsets.get(changelog); + if (offsetResult == null) { + log.debug("Fetched end offsets did not contain the changelog {} of task {}", changelog, task); + throw new IllegalStateException("Could not get end offset for " + changelog); + } + taskEndOffsetSums + .computeIfPresent(task, (id, curOffsetSum) -> curOffsetSum + offsetResult.offset()); + } + } + return taskEndOffsetSums; + } - log.info("Assigned tasks to clients as {}{}.", Utils.NL, states.entrySet().stream() - .map(Map.Entry::toString).collect(Collectors.joining(Utils.NL))); + /** + * @return Sorted set of all client candidates for each stateful task, ranked by their overall lag + */ + static Map>> buildClientRankingsByTask(final Set statefulTasks, + final Map states, + final long acceptableRecoveryLag) { + final Map>> statefulTasksToRankedCandidates = new TreeMap<>(); + + for (final TaskId task : statefulTasks) { + final SortedSet> rankedClientCandidates = new TreeSet<>(); + statefulTasksToRankedCandidates.put(task, rankedClientCandidates); + + for (final Map.Entry clientEntry : states.entrySet()) { + final UUID clientId = clientEntry.getKey(); + final long taskLag = clientEntry.getValue().lagFor(task); + final long clientRank; + if (taskLag == Task.LATEST_OFFSET) { + clientRank = Task.LATEST_OFFSET; + } else if (taskLag <= acceptableRecoveryLag) { + clientRank = 0; + } else { + clientRank = taskLag; + } + rankedClientCandidates.add(new RankedClient<>(clientId, clientRank)); + } + } + return statefulTasksToRankedCandidates; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 6aba3953f5e3c..66f528c4b1fd2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -45,6 +45,7 @@ public final class AssignorConfiguration { private final String userEndPoint; private final TaskManager taskManager; private final StreamsMetadataState streamsMetadataState; + private final Admin adminClient; private final InternalTopicManager internalTopicManager; private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer; private final StreamsConfig streamsConfig; @@ -144,7 +145,8 @@ public AssignorConfiguration(final Map configs) { throw fatalException; } - internalTopicManager = new InternalTopicManager((Admin) o, streamsConfig); + adminClient = (Admin) o; + internalTopicManager = new InternalTopicManager(adminClient, streamsConfig); } @@ -250,6 +252,10 @@ public String getUserEndPoint() { return userEndPoint; } + public Admin getAdminClient() { + return adminClient; + } + public InternalTopicManager getInternalTopicManager() { return internalTopicManager; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index df42b14e29e26..fb2e5584d3663 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; + import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; @@ -24,6 +26,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.kafka.streams.processor.internals.Task; public class ClientState { private final Set activeTasks; @@ -34,6 +37,8 @@ public class ClientState { private final Set prevAssignedTasks; private final Map ownedPartitions; + private final Map taskOffsetSums; // contains only stateful tasks we previously owned + private final Map taskLagTotals; // contains lag for all stateful tasks in the app topology private int capacity; @@ -49,7 +54,9 @@ public ClientState() { new HashSet<>(), new HashSet<>(), new HashMap<>(), - capacity); + new HashMap<>(), + new HashMap<>(), + capacity); } private ClientState(final Set activeTasks, @@ -59,6 +66,8 @@ private ClientState(final Set activeTasks, final Set prevStandbyTasks, final Set prevAssignedTasks, final Map ownedPartitions, + final Map taskOffsetSums, + final Map taskLagTotals, final int capacity) { this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; @@ -67,6 +76,8 @@ private ClientState(final Set activeTasks, this.prevStandbyTasks = prevStandbyTasks; this.prevAssignedTasks = prevAssignedTasks; this.ownedPartitions = ownedPartitions; + this.taskOffsetSums = taskOffsetSums; + this.taskLagTotals = taskLagTotals; this.capacity = capacity; } @@ -79,6 +90,8 @@ public ClientState copy() { new HashSet<>(prevStandbyTasks), new HashSet<>(prevAssignedTasks), new HashMap<>(ownedPartitions), + new HashMap<>(taskOffsetSums), + new HashMap<>(taskLagTotals), capacity); } @@ -104,7 +117,7 @@ public Set prevActiveTasks() { return prevActiveTasks; } - public Set prevStandbyTasks() { + Set prevStandbyTasks() { return prevStandbyTasks; } @@ -131,7 +144,7 @@ public void addPreviousActiveTasks(final Set prevTasks) { prevAssignedTasks.addAll(prevTasks); } - public void addPreviousStandbyTasks(final Set standbyTasks) { + void addPreviousStandbyTasks(final Set standbyTasks) { prevStandbyTasks.addAll(standbyTasks); prevAssignedTasks.addAll(standbyTasks); } @@ -142,6 +155,62 @@ public void addOwnedPartitions(final Collection ownedPartitions, } } + public void addPreviousTasksAndOffsetSums(final Map taskOffsetSums) { + for (final Map.Entry taskEntry : taskOffsetSums.entrySet()) { + final TaskId id = taskEntry.getKey(); + final long offsetSum = taskEntry.getValue(); + if (offsetSum == Task.LATEST_OFFSET) { + prevActiveTasks.add(id); + } else { + prevStandbyTasks.add(id); + } + prevAssignedTasks.add(id); + } + this.taskOffsetSums.putAll(taskOffsetSums); + } + + public void computeTaskLags(final Map allTaskEndOffsetSums) { + if (!taskLagTotals.isEmpty()) { + throw new IllegalStateException("Already computed task lags for this client."); + } + + for (final Map.Entry taskEntry : allTaskEndOffsetSums.entrySet()) { + final TaskId task = taskEntry.getKey(); + final Long endOffsetSum = taskEntry.getValue(); + final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L); + + if (endOffsetSum == UNKNOWN_OFFSET_SUM) { + + } else if (endOffsetSum < offsetSum) { + throw new IllegalStateException("Task " + task + " had endOffsetSum=" + endOffsetSum + + " smaller than offsetSum=" + offsetSum); + } + + if (offsetSum == Task.LATEST_OFFSET) { + taskLagTotals.put(task, Task.LATEST_OFFSET); + } else { + taskLagTotals.put(task, endOffsetSum - offsetSum); + } + } + } + + /** + * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client + * did not have any state for this task on disk. + * + * @return end offset sum - offset sum + * Task.LATEST_OFFSET if this was previously an active running task on this client + */ + public long lagFor(final TaskId task) { + final Long totalLag = taskLagTotals.get(task); + + if (totalLag == null) { + throw new IllegalStateException("Tried to lookup lag for unknown task " + task); + } else { + return totalLag; + } + } + public void removeFromAssignment(final TaskId task) { activeTasks.remove(task); assignedTasks.remove(task); @@ -156,6 +225,7 @@ public String toString() { ") prevStandbyTasks: (" + prevStandbyTasks + ") prevAssignedTasks: (" + prevAssignedTasks + ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + + ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + ") capacity: " + capacity + "]"; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 411ff0435f06c..49496cfc186c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -46,7 +46,7 @@ public class SubscriptionInfo { static final int UNKNOWN = -1; static final int MIN_VERSION_OFFSET_SUM_SUBSCRIPTION = 7; - static final long UNKNOWN_OFFSET_SUM = -3L; + public static final long UNKNOWN_OFFSET_SUM = -3L; private final SubscriptionInfoData data; private Set prevTasksCache = null; @@ -198,7 +198,7 @@ public Set standbyTasks() { return standbyTasksCache; } - Map taskOffsetSums() { + public Map taskOffsetSums() { if (taskOffsetSumsCache == null) { taskOffsetSumsCache = new HashMap<>(); if (data.version() >= MIN_VERSION_OFFSET_SUM_SUBSCRIPTION) { diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 7087085a61454..72dddc7f4c14d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,13 +16,16 @@ */ package org.apache.kafka.streams; +import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.MetricConfig; @@ -33,6 +36,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.metrics.ClientMetrics; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -83,16 +87,20 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsets; import static org.easymock.EasyMock.anyInt; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -883,6 +891,45 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception { startStreamsAndCheckDirExists(topology, true); } + @Test + public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException()); + replay(adminClient); + assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient)); + verify(adminClient); + } + + @Test + public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws InterruptedException, ExecutionException { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); + + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); + EasyMock.expect(result.all()).andStubReturn(allFuture); + EasyMock.expect(allFuture.get()).andThrow(new InterruptedException()); + replay(adminClient, result, allFuture); + + assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient)); + verify(adminClient); + } + + @Test + public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws InterruptedException, ExecutionException { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); + + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); + EasyMock.expect(result.all()).andStubReturn(allFuture); + EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException())); + replay(adminClient, result, allFuture); + + assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient)); + verify(adminClient); + } + @SuppressWarnings("unchecked") private Topology getStatefulTopology(final String inputTopic, final String outputTopic, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 820ead807e120..03393fc1c786d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,7 +16,12 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Map.Entry; +import java.util.SortedSet; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; @@ -27,6 +32,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -38,6 +44,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.RankedClient; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ClientState; @@ -49,6 +56,7 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.easymock.Capture; import org.easymock.EasyMock; +import org.junit.Before; import org.junit.Test; import java.nio.ByteBuffer; @@ -69,11 +77,16 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static java.util.Collections.singletonMap; +import static java.util.Collections.singleton; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.common.utils.Utils.mkSortedSet; +import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.buildClientRankingsByTask; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.is; @@ -156,9 +169,11 @@ public class StreamsPartitionAssignorTest { private final Set emptyTasks = emptySet(); private final Map emptyTaskOffsetSums = emptyMap(); + private final Map emptyChangelogEndOffsets = new HashMap<>(); + private final UUID uuid1 = UUID.randomUUID(); private final UUID uuid2 = UUID.randomUUID(); - + private final UUID uuid3 = UUID.randomUUID(); private final SubscriptionInfo defaultSubscriptionInfo = getInfo(uuid1, emptyTasks, emptyTasks); private final Cluster metadata = new Cluster( @@ -174,8 +189,10 @@ public class StreamsPartitionAssignorTest { private static final String USER_END_POINT = "localhost:8080"; private static final String OTHER_END_POINT = "other:9090"; private static final String APPLICATION_ID = "stream-partition-assignor-test"; + private static final long ACCEPTABLE_RECOVERY_LAG = 100L; private TaskManager taskManager; + private Admin adminClient; private InternalTopologyBuilder builder = new InternalTopologyBuilder(); private StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class); private final Map subscriptions = new HashMap<>(); @@ -186,7 +203,7 @@ private Map configProps() { configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT); configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager); configurationMap.put(StreamsConfig.InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState); - configurationMap.put(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT, EasyMock.createNiceMock(Admin.class)); + configurationMap.put(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT, adminClient); configurationMap.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger()); return configurationMap; } @@ -199,7 +216,7 @@ private void configureDefault() { // TaskManager must be created first private void configureDefaultPartitionAssignor() { partitionAssignor.configure(configProps()); - EasyMock.replay(taskManager); + EasyMock.replay(taskManager, adminClient); } // TaskManager must be created first @@ -207,7 +224,7 @@ private void configurePartitionAssignorWith(final Map props) { final Map configurationMap = configProps(); configurationMap.putAll(props); partitionAssignor.configure(configurationMap); - EasyMock.replay(taskManager); + EasyMock.replay(taskManager, adminClient); } private void createDefaultMockTaskManager() { @@ -223,13 +240,39 @@ private void createMockTaskManager(final Set activeTasks, private void createMockTaskManager(final Map taskOffsetSums, final UUID processId) { taskManager = EasyMock.createNiceMock(TaskManager.class); - EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes(); - EasyMock.expect(taskManager.getTaskOffsetSums()).andReturn(taskOffsetSums).anyTimes(); - EasyMock.expect(taskManager.processId()).andReturn(processId).anyTimes(); + expect(taskManager.builder()).andReturn(builder).anyTimes(); + expect(taskManager.getTaskOffsetSums()).andReturn(taskOffsetSums).anyTimes(); + expect(taskManager.processId()).andReturn(processId).anyTimes(); builder.setApplicationId(APPLICATION_ID); builder.buildTopology(); } + private void createMockAdminClient(final Map changelogEndOffsets) { + adminClient = EasyMock.createMock(AdminClient.class); + + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFutureImpl> allFuture = new KafkaFutureImpl<>(); + allFuture.complete(changelogEndOffsets.entrySet().stream().collect(Collectors.toMap( + Entry::getKey, + t -> { + final ListOffsetsResultInfo info = EasyMock.createNiceMock(ListOffsetsResultInfo.class); + expect(info.offset()).andStubReturn(t.getValue()); + EasyMock.replay(info); + return info; + })) + ); + + expect(result.all()).andReturn(allFuture); + expect(adminClient.listOffsets(anyObject())).andStubReturn(result); + + EasyMock.replay(result); + } + + @Before + public void setUp() { + createMockAdminClient(emptyChangelogEndOffsets); + } + @Test public void shouldUseEagerRebalancingProtocol() { createDefaultMockTaskManager(); @@ -1287,7 +1330,7 @@ public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() { final Map> initialHostState = mkMap( mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)), mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1)) - ); + ); final Map> newHostState = mkMap( mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)), @@ -1322,7 +1365,7 @@ public void shouldTriggerRebalanceOnHostInfoChange() { ); createDefaultMockTaskManager(); - configurePartitionAssignorWith(singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "newhost:9090")); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, "newhost:9090")); partitionAssignor.onAssignment(createAssignment(oldHostState), null); @@ -1795,6 +1838,93 @@ public void shouldGetAssignmentConfigs() { assertThat(partitionAssignor.probingRebalanceIntervalMs(), equalTo(55 * 60 * 1000L)); } + @Test + public void shouldRankPreviousClientAboveEquallyCaughtUpClient() { + final ClientState client1 = EasyMock.createMock(ClientState.class); + final ClientState client2 = EasyMock.createMock(ClientState.class); + + expect(client1.lagFor(task0_0)).andReturn(Task.LATEST_OFFSET); + expect(client2.lagFor(task0_0)).andReturn(0L); + + final SortedSet> expectedClientRanking = mkSortedSet( + new RankedClient<>(uuid1, Task.LATEST_OFFSET), + new RankedClient<>(uuid2, 0L) + ); + + replay(client1, client2); + + final Map states = mkMap( + mkEntry(uuid1, client1), + mkEntry(uuid2, client2) + ); + + final Map>> statefulTasksToRankedCandidates = + buildClientRankingsByTask(singleton(task0_0), states, ACCEPTABLE_RECOVERY_LAG); + + final SortedSet> clientRanking = statefulTasksToRankedCandidates.get(task0_0); + + EasyMock.verify(client1, client2); + assertThat(clientRanking, equalTo(expectedClientRanking)); + } + + @Test + public void shouldRankAllClientsWithinAcceptableRecoveryLagWithRank0() { + final ClientState client1 = EasyMock.createMock(ClientState.class); + final ClientState client2 = EasyMock.createMock(ClientState.class); + + expect(client1.lagFor(task0_0)).andReturn(100L); + expect(client2.lagFor(task0_0)).andReturn(0L); + + final SortedSet> expectedClientRanking = mkSortedSet( + new RankedClient<>(uuid1, 0L), + new RankedClient<>(uuid2, 0L) + ); + + replay(client1, client2); + + final Map states = mkMap( + mkEntry(uuid1, client1), + mkEntry(uuid2, client2) + ); + + final Map>> statefulTasksToRankedCandidates = + buildClientRankingsByTask(singleton(task0_0), states, ACCEPTABLE_RECOVERY_LAG); + + EasyMock.verify(client1, client2); + assertThat(statefulTasksToRankedCandidates.get(task0_0), equalTo(expectedClientRanking)); + } + + @Test + public void shouldRankNotCaughtUpClientsAccordingToLag() { + final ClientState client1 = EasyMock.createMock(ClientState.class); + final ClientState client2 = EasyMock.createMock(ClientState.class); + final ClientState client3 = EasyMock.createMock(ClientState.class); + + expect(client1.lagFor(task0_0)).andReturn(900L); + expect(client2.lagFor(task0_0)).andReturn(800L); + expect(client3.lagFor(task0_0)).andReturn(500L); + + final SortedSet> expectedClientRanking = mkSortedSet( + new RankedClient<>(uuid3, 500L), + new RankedClient<>(uuid2, 800L), + new RankedClient<>(uuid1, 900L) + ); + + replay(client1, client2, client3); + + final Map states = mkMap( + mkEntry(uuid1, client1), + mkEntry(uuid2, client2), + mkEntry(uuid3, client3) + ); + + final Map>> statefulTasksToRankedCandidates = + buildClientRankingsByTask(singleton(task0_0), states, ACCEPTABLE_RECOVERY_LAG); + + EasyMock.verify(client1, client2, client3); + assertThat(statefulTasksToRankedCandidates.get(task0_0), equalTo(expectedClientRanking)); + } + private static ByteBuffer encodeFutureSubscription() { final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */); buf.putInt(LATEST_SUPPORTED_VERSION + 1); @@ -1907,7 +2037,7 @@ private static SubscriptionInfo getInfoForOlderVersion(final int version, } // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets - static Map getTaskOffsetSums(final Set activeTasks, final Set standbyTasks) { + private static Map getTaskOffsetSums(final Set activeTasks, final Set standbyTasks) { final Map taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> Task.LATEST_OFFSET)); taskOffsetSums.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L))); return taskOffsetSums; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index 1443edf13c0c3..6d3c5e776e1e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -16,20 +16,30 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.Map; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task; import org.junit.Test; import java.util.Collections; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class ClientStateTest { private final ClientState client = new ClientState(1); + private final ClientState zeroCapacityClient = new ClientState(0); + + private final TaskId taskId01 = new TaskId(0, 1); + private final TaskId taskId02 = new TaskId(0, 2); @Test public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() { @@ -38,142 +48,203 @@ public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() { @Test public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity() { - client.assign(new TaskId(0, 1), true); + client.assign(taskId01, true); assertTrue(client.reachedCapacity()); } - @Test public void shouldAddActiveTasksToBothAssignedAndActive() { - final TaskId tid = new TaskId(0, 1); - - client.assign(tid, true); - assertThat(client.activeTasks(), equalTo(Collections.singleton(tid))); - assertThat(client.assignedTasks(), equalTo(Collections.singleton(tid))); + client.assign(taskId01, true); + assertThat(client.activeTasks(), equalTo(Collections.singleton(taskId01))); + assertThat(client.assignedTasks(), equalTo(Collections.singleton(taskId01))); assertThat(client.assignedTaskCount(), equalTo(1)); assertThat(client.standbyTasks().size(), equalTo(0)); } @Test public void shouldAddStandbyTasksToBothStandbyAndActive() { - final TaskId tid = new TaskId(0, 1); - - client.assign(tid, false); - assertThat(client.assignedTasks(), equalTo(Collections.singleton(tid))); - assertThat(client.standbyTasks(), equalTo(Collections.singleton(tid))); + client.assign(taskId01, false); + assertThat(client.assignedTasks(), equalTo(Collections.singleton(taskId01))); + assertThat(client.standbyTasks(), equalTo(Collections.singleton(taskId01))); assertThat(client.assignedTaskCount(), equalTo(1)); assertThat(client.activeTasks().size(), equalTo(0)); } @Test public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() { - final TaskId tid1 = new TaskId(0, 1); - final TaskId tid2 = new TaskId(0, 2); - - client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2)); - assertThat(client.prevActiveTasks(), equalTo(Utils.mkSet(tid1, tid2))); - assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); + client.addPreviousActiveTasks(Utils.mkSet(taskId01, taskId02)); + assertThat(client.prevActiveTasks(), equalTo(Utils.mkSet(taskId01, taskId02))); + assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(taskId01, taskId02))); } @Test public void shouldAddPreviousStandbyTasksToPreviousAssigned() { - final TaskId tid1 = new TaskId(0, 1); - final TaskId tid2 = new TaskId(0, 2); - - client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2)); + client.addPreviousStandbyTasks(Utils.mkSet(taskId01, taskId02)); assertThat(client.prevActiveTasks().size(), equalTo(0)); - assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); + assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(taskId01, taskId02))); } @Test public void shouldHaveAssignedTaskIfActiveTaskAssigned() { - final TaskId tid = new TaskId(0, 2); - - client.assign(tid, true); - assertTrue(client.hasAssignedTask(tid)); + client.assign(taskId01, true); + assertTrue(client.hasAssignedTask(taskId01)); } @Test public void shouldHaveAssignedTaskIfStandbyTaskAssigned() { - final TaskId tid = new TaskId(0, 2); - - client.assign(tid, false); - assertTrue(client.hasAssignedTask(tid)); + client.assign(taskId01, false); + assertTrue(client.hasAssignedTask(taskId01)); } @Test public void shouldNotHaveAssignedTaskIfTaskNotAssigned() { - - client.assign(new TaskId(0, 2), true); - assertFalse(client.hasAssignedTask(new TaskId(0, 3))); + client.assign(taskId01, true); + assertFalse(client.hasAssignedTask(taskId02)); } @Test public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks() { - final ClientState c2 = new ClientState(1); - client.assign(new TaskId(0, 1), true); - assertTrue(c2.hasMoreAvailableCapacityThan(client)); - assertFalse(client.hasMoreAvailableCapacityThan(c2)); + final ClientState otherClient = new ClientState(1); + client.assign(taskId01, true); + assertTrue(otherClient.hasMoreAvailableCapacityThan(client)); + assertFalse(client.hasMoreAvailableCapacityThan(otherClient)); } @Test public void shouldHaveMoreAvailableCapacityWhenCapacityHigherAndSameAssignedTaskCount() { - final ClientState c2 = new ClientState(2); - assertTrue(c2.hasMoreAvailableCapacityThan(client)); - assertFalse(client.hasMoreAvailableCapacityThan(c2)); + final ClientState otherClient = new ClientState(2); + assertTrue(otherClient.hasMoreAvailableCapacityThan(client)); + assertFalse(client.hasMoreAvailableCapacityThan(otherClient)); } @Test public void shouldUseMultiplesOfCapacityToDetermineClientWithMoreAvailableCapacity() { - final ClientState c2 = new ClientState(2); + final ClientState otherClient = new ClientState(2); for (int i = 0; i < 7; i++) { - c2.assign(new TaskId(0, i), true); + otherClient.assign(new TaskId(0, i), true); } for (int i = 7; i < 11; i++) { client.assign(new TaskId(0, i), true); } - assertTrue(c2.hasMoreAvailableCapacityThan(client)); + assertTrue(otherClient.hasMoreAvailableCapacityThan(client)); } @Test public void shouldHaveMoreAvailableCapacityWhenCapacityIsTheSameButAssignedTasksIsLess() { - final ClientState c1 = new ClientState(3); - final ClientState c2 = new ClientState(3); + final ClientState client = new ClientState(3); + final ClientState otherClient = new ClientState(3); for (int i = 0; i < 4; i++) { - c1.assign(new TaskId(0, i), true); - c2.assign(new TaskId(0, i), true); + client.assign(new TaskId(0, i), true); + otherClient.assign(new TaskId(0, i), true); } - c2.assign(new TaskId(0, 5), true); - assertTrue(c1.hasMoreAvailableCapacityThan(c2)); + otherClient.assign(new TaskId(0, 5), true); + assertTrue(client.hasMoreAvailableCapacityThan(otherClient)); } - @Test(expected = IllegalStateException.class) + @Test public void shouldThrowIllegalStateExceptionIfCapacityOfThisClientStateIsZero() { - final ClientState c1 = new ClientState(0); - c1.hasMoreAvailableCapacityThan(new ClientState(1)); + assertThrows(IllegalStateException.class, () -> zeroCapacityClient.hasMoreAvailableCapacityThan(client)); } - @Test(expected = IllegalStateException.class) + @Test public void shouldThrowIllegalStateExceptionIfCapacityOfOtherClientStateIsZero() { - final ClientState c1 = new ClientState(1); - c1.hasMoreAvailableCapacityThan(new ClientState(0)); + assertThrows(IllegalStateException.class, () -> client.hasMoreAvailableCapacityThan(zeroCapacityClient)); } @Test public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread() { - final ClientState client = new ClientState(1); client.assign(new TaskId(0, 1), true); assertTrue(client.hasUnfulfilledQuota(2)); } @Test public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread() { - final ClientState client = new ClientState(1); client.assign(new TaskId(0, 1), true); assertFalse(client.hasUnfulfilledQuota(1)); } + @Test + public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() { + final Map taskOffsetSums = Collections.singletonMap(taskId01, Task.LATEST_OFFSET); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + assertThat(client.prevActiveTasks(), equalTo(Collections.singleton(taskId01))); + assertThat(client.previousAssignedTasks(), equalTo(Collections.singleton(taskId01))); + assertTrue(client.prevStandbyTasks().isEmpty()); + } + + @Test + public void shouldAddTasksInOffsetSumsMapToPrevStandbyTasks() { + final Map taskOffsetSums = mkMap( + mkEntry(taskId01, 0L), + mkEntry(taskId02, 100L) + ); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + assertThat(client.prevStandbyTasks(), equalTo(mkSet(taskId01, taskId02))); + assertThat(client.previousAssignedTasks(), equalTo(mkSet(taskId01, taskId02))); + assertTrue(client.prevActiveTasks().isEmpty()); + } + + @Test + public void shouldComputeTaskLags() { + final Map taskOffsetSums = mkMap( + mkEntry(taskId01, 0L), + mkEntry(taskId02, 100L) + ); + final Map allTaskEndOffsetSums = mkMap( + mkEntry(taskId01, 500L), + mkEntry(taskId02, 100L) + ); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + client.computeTaskLags(allTaskEndOffsetSums); + + assertThat(client.lagFor(taskId01), equalTo(500L)); + assertThat(client.lagFor(taskId02), equalTo(0L)); + } + + @Test + public void shouldReturnEndOffsetSumForLagOfTaskWeDidNotPreviouslyOwn() { + final Map taskOffsetSums = Collections.emptyMap(); + final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + client.computeTaskLags(allTaskEndOffsetSums); + assertThat(client.lagFor(taskId01), equalTo(500L)); + } + + @Test + public void shouldReturnLatestOffsetForLagOfPreviousActiveRunningTask() { + final Map taskOffsetSums = Collections.singletonMap(taskId01, Task.LATEST_OFFSET); + final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + client.computeTaskLags(allTaskEndOffsetSums); + assertThat(client.lagFor(taskId01), equalTo(Task.LATEST_OFFSET)); + } + + @Test + public void shouldThrowIllegalStateExceptionIfOffsetSumIsGreaterThanEndOffsetSum() { + final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); + final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums)); + } + + @Test + public void shouldThrowIllegalStateExceptionIfTaskLagsMapIsNotEmpty() { + final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); + final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); + client.computeTaskLags(taskOffsetSums); + assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums)); + } + + @Test + public void shouldThrowIllegalStateExceptionOnLagForUnknownTask() { + final Map taskOffsetSums = Collections.singletonMap(taskId01, 0L); + final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + client.computeTaskLags(allTaskEndOffsetSums); + assertThrows(IllegalStateException.class, () -> client.lagFor(taskId02)); + } + } \ No newline at end of file From a86546e386fa885fba55c68bb6ee9d529c0e8329 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 19 Mar 2020 11:45:35 -0700 Subject: [PATCH 2/7] rank UNKNOWN_OFFSET_SUM behind caught-up clients, but above others --- .../internals/StreamsPartitionAssignor.java | 22 +- .../internals/assignment/ClientState.java | 9 +- .../StreamsPartitionAssignorTest.java | 248 +++++++++++++----- .../internals/assignment/ClientStateTest.java | 10 + 4 files changed, 209 insertions(+), 80 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 6d89b50a64896..1f6277689d6df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -69,6 +69,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; +import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable { @@ -745,13 +746,14 @@ private void assignTasksToClients(final Set allSourceTopics, final boolean lagComputationSuccessful = populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask); - final Map>> statefulTasksToRankedCandidates = - buildClientRankingsByTask(changelogsByStatefulTask.keySet(), clientStates, assignmentConfigs.acceptableRecoveryLag); - // assign tasks to clients if (lagComputationSuccessful) { + final Map>> statefulTasksToRankedCandidates = + buildClientRankingsByTask(changelogsByStatefulTask.keySet(), clientStates, assignmentConfigs.acceptableRecoveryLag); + log.debug("Assigning tasks {} to clients {} with number of replicas {}", partitionsForTask.keySet(), clientStates, assignmentConfigs.numStandbyReplicas); + final StickyTaskAssignor taskAssignor = new StickyTaskAssignor<>(clientStates, partitionsForTask.keySet(), statefulTasksToRankedCandidates.keySet()); taskAssignor.assign(assignmentConfigs.numStandbyReplicas); @@ -790,7 +792,7 @@ private boolean populateClientStatesMap(final Map clientState allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask); fetchEndOffsetsSuccessful = true; - } catch(final StreamsException e) { + } catch (final StreamsException e) { allTaskEndOffsetSums = null; fetchEndOffsetsSuccessful = false; } @@ -845,14 +847,18 @@ private Map computeEndOffsetSumsByTask(final Map curOffsetSum + offsetResult.offset()); + taskEndOffsetSums.computeIfPresent(task, (id, curOffsetSum) -> curOffsetSum + offsetResult.offset()); } } return taskEndOffsetSums; } /** + * Rankings are computed as follows, with lower being more caught up: + * Rank -1: active running task + * Rank 0: standby or restoring task whose overall lag is within the acceptableRecoveryLag bounds + * Rank 1: tasks whose lag is unknown, eg because it was not encoded in an older version subscription + * Rank 1+: all other tasks are ranked according to their actual total lag * @return Sorted set of all client candidates for each stateful task, ranked by their overall lag */ static Map>> buildClientRankingsByTask(final Set statefulTasks, @@ -870,8 +876,10 @@ static Map>> buildClientRankingsByTask(fina final long clientRank; if (taskLag == Task.LATEST_OFFSET) { clientRank = Task.LATEST_OFFSET; + } else if (taskLag == UNKNOWN_OFFSET_SUM) { + clientRank = 1L; } else if (taskLag <= acceptableRecoveryLag) { - clientRank = 0; + clientRank = 0L; } else { clientRank = taskLag; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index fb2e5584d3663..7a84f921a8cb2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -169,6 +169,9 @@ public void addPreviousTasksAndOffsetSums(final Map taskOffsetSums this.taskOffsetSums.putAll(taskOffsetSums); } + /** + * Compute the lag for each stateful task, including tasks this client did not previously have. + */ public void computeTaskLags(final Map allTaskEndOffsetSums) { if (!taskLagTotals.isEmpty()) { throw new IllegalStateException("Already computed task lags for this client."); @@ -179,15 +182,15 @@ public void computeTaskLags(final Map allTaskEndOffsetSums) { final Long endOffsetSum = taskEntry.getValue(); final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L); - if (endOffsetSum == UNKNOWN_OFFSET_SUM) { - - } else if (endOffsetSum < offsetSum) { + if (endOffsetSum < offsetSum) { throw new IllegalStateException("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" + offsetSum); } if (offsetSum == Task.LATEST_OFFSET) { taskLagTotals.put(task, Task.LATEST_OFFSET); + } else if (offsetSum == UNKNOWN_OFFSET_SUM) { + taskLagTotals.put(task, UNKNOWN_OFFSET_SUM); } else { taskLagTotals.put(task, endOffsetSum - offsetSum); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 03393fc1c786d..20b43cd948f3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -78,12 +78,15 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSortedSet; import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.buildClientRankingsByTask; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; +import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -185,7 +188,6 @@ public class StreamsPartitionAssignorTest { private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor(); private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - private final StreamsConfig streamsConfig = new StreamsConfig(configProps()); private static final String USER_END_POINT = "localhost:8080"; private static final String OTHER_END_POINT = "other:9090"; private static final String APPLICATION_ID = "stream-partition-assignor-test"; @@ -193,6 +195,7 @@ public class StreamsPartitionAssignorTest { private TaskManager taskManager; private Admin adminClient; + private StreamsConfig streamsConfig = new StreamsConfig(configProps()); private InternalTopologyBuilder builder = new InternalTopologyBuilder(); private StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class); private final Map subscriptions = new HashMap<>(); @@ -213,17 +216,18 @@ private void configureDefault() { configureDefaultPartitionAssignor(); } - // TaskManager must be created first + // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor private void configureDefaultPartitionAssignor() { - partitionAssignor.configure(configProps()); - EasyMock.replay(taskManager, adminClient); + configurePartitionAssignorWith(emptyMap()); } - // TaskManager must be created first + // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor private void configurePartitionAssignorWith(final Map props) { - final Map configurationMap = configProps(); - configurationMap.putAll(props); - partitionAssignor.configure(configurationMap); + final Map configMap = configProps(); + configMap.putAll(props); + + streamsConfig = new StreamsConfig(configMap); + partitionAssignor.configure(configMap); EasyMock.replay(taskManager, adminClient); } @@ -247,6 +251,8 @@ private void createMockTaskManager(final Map taskOffsetSums, builder.buildTopology(); } + // If you don't care about settings the end offsets for each specific topic partition, the helper method + // getTopicPartitionOffsetMap is useful for building this input map based on the topics and number of partitions private void createMockAdminClient(final Map changelogEndOffsets) { adminClient = EasyMock.createMock(AdminClient.class); @@ -262,12 +268,18 @@ private void createMockAdminClient(final Map changelogEndO })) ); - expect(result.all()).andReturn(allFuture); expect(adminClient.listOffsets(anyObject())).andStubReturn(result); + expect(result.all()).andReturn(allFuture); EasyMock.replay(result); } + private MockInternalTopicManager overwriteInternalTopicManagerWithMock() { + final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); + partitionAssignor.setInternalTopicManager(mockInternalTopicManager); + return mockInternalTopicManager; + } + @Before public void setUp() { createMockAdminClient(emptyChangelogEndOffsets); @@ -520,8 +532,7 @@ public void testAssignBasic() { createMockTaskManager(prevTasks10, standbyTasks10); configureDefaultPartitionAssignor(); - - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -608,8 +619,7 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { final TaskId taskIdB3 = new TaskId(1, 3); configureDefault(); - - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -654,9 +664,12 @@ public void testAssignWithPartialTopology() { final Set allTasks = mkSet(task0_0, task0_1, task0_2); createDefaultMockTaskManager(); + createMockAdminClient(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class)); - - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); // will throw exception if it fails subscriptions.put("consumer10", @@ -745,8 +758,7 @@ public void testAssignWithNewTasks() { createMockTaskManager(prevTasks10, emptyTasks); configureDefaultPartitionAssignor(); - - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -804,9 +816,14 @@ public void testAssignWithStates() { final TaskId task12 = new TaskId(1, 2); final List tasks = asList(task00, task01, task02, task10, task11, task12); + createMockAdminClient(getTopicPartitionOffsetsMap( + asList(APPLICATION_ID + "-store1-changelog", + APPLICATION_ID + "-store2-changelog", + APPLICATION_ID + "-store3-changelog"), + asList(3, 3, 3)) + ); configureDefault(); - - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription(topics, defaultSubscriptionInfo.encode())); @@ -866,10 +883,6 @@ private static Set tasksForState(final String storeName, @Test public void testAssignWithStandbyReplicasAndStatelessTasks() { - final Map props = configProps(); - props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); - final StreamsConfig streamsConfig = new StreamsConfig(props); - builder.addSource(null, "source1", null, null, null, "topic1", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1"); @@ -877,7 +890,7 @@ public void testAssignWithStandbyReplicasAndStatelessTasks() { createMockTaskManager(mkSet(task0_0), emptySet()); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -900,10 +913,6 @@ public void testAssignWithStandbyReplicasAndStatelessTasks() { @Test public void testAssignWithStandbyReplicasAndLoggingDisabled() { - final Map props = configProps(); - props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); - final StreamsConfig streamsConfig = new StreamsConfig(props); - builder.addSource(null, "source1", null, null, null, "topic1", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1"); builder.addStateStore(new MockKeyValueStoreBuilder("store1", false).withLoggingDisabled(), "processor"); @@ -912,7 +921,7 @@ public void testAssignWithStandbyReplicasAndLoggingDisabled() { createMockTaskManager(mkSet(task0_0), emptySet()); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -935,10 +944,6 @@ public void testAssignWithStandbyReplicasAndLoggingDisabled() { @Test public void testAssignWithStandbyReplicas() { - final Map props = configProps(); - props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); - final StreamsConfig streamsConfig = new StreamsConfig(props); - builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -960,9 +965,12 @@ public void testAssignWithStandbyReplicas() { final Set standbyTasks02 = mkSet(task0_2); createMockTaskManager(prevTasks00, standbyTasks01); + createMockAdminClient(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -1080,16 +1088,14 @@ public void testAssignWithInternalTopics() { final Set allTasks = mkSet(task0_0, task0_1, task0_2); configureDefault(); - - final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); - partitionAssignor.setInternalTopicManager(internalTopicManager); + final MockInternalTopicManager internalTopicManager = overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( topics, defaultSubscriptionInfo.encode()) ); - partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); // check prepared internal topics assertEquals(1, internalTopicManager.readyTopics.size()); @@ -1111,17 +1117,14 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { final Set allTasks = mkSet(task0_0, task0_1, task0_2); configureDefault(); - - final MockInternalTopicManager internalTopicManager = - new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); - partitionAssignor.setInternalTopicManager(internalTopicManager); + final MockInternalTopicManager internalTopicManager = overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( topics, defaultSubscriptionInfo.encode()) ); - partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); // check prepared internal topics assertEquals(2, internalTopicManager.readyTopics.size()); @@ -1155,12 +1158,14 @@ public void shouldGenerateTasksForAllCreatedPartitions() { final String client = "client1"; builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); - configureDefault(); + createMockAdminClient(getTopicPartitionOffsetsMap( + asList(APPLICATION_ID + "-topic3-STATE-STORE-0000000002-changelog", + APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog"), + asList(4, 4)) + ); - final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( - streamsConfig, - mockClientSupplier.restoreConsumer); - partitionAssignor.setInternalTopicManager(mockInternalTopicManager); + configureDefault(); + final MockInternalTopicManager mockInternalTopicManager = overwriteInternalTopicManagerWithMock(); subscriptions.put(client, new Subscription( @@ -1168,8 +1173,7 @@ public void shouldGenerateTasksForAllCreatedPartitions() { defaultSubscriptionInfo.encode()) ); final Map assignment = - partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)) - .groupAssignment(); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); final Map expectedCreatedInternalTopics = new HashMap<>(); expectedCreatedInternalTopics.put(APPLICATION_ID + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4); @@ -1229,8 +1233,7 @@ public void shouldMapUserEndPointToTopicPartitions() { createDefaultMockTaskManager(); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, USER_END_POINT)); - - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer1", new Subscription( @@ -1307,11 +1310,7 @@ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTas builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); configureDefault(); - - final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( - streamsConfig, - mockClientSupplier.restoreConsumer); - partitionAssignor.setInternalTopicManager(mockInternalTopicManager); + final MockInternalTopicManager mockInternalTopicManager = overwriteInternalTopicManagerWithMock(); subscriptions.put(client, new Subscription( @@ -1386,14 +1385,17 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); createDefaultMockTaskManager(); + createMockAdminClient(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"), + singletonList(3)) + ); + final Map props = new HashMap<>(); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, USER_END_POINT); - configurePartitionAssignorWith(props); - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager( - streamsConfig, - mockClientSupplier.restoreConsumer)); + configurePartitionAssignorWith(props); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer1", new Subscription( @@ -1618,10 +1620,6 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN @Test public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() { - final Map props = configProps(); - props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); - final StreamsConfig streamsConfig = new StreamsConfig(props); - builder.addSource(null, "source1", null, null, null, "topic1"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1"); builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor"); @@ -1639,8 +1637,13 @@ public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionPro ); createMockTaskManager(allTasks, allTasks); + createMockAdminClient(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); + overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer1", new Subscription( @@ -1797,9 +1800,7 @@ public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() { configureDefault(); - final MockInternalTopicManager internalTopicManager = - new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); - partitionAssignor.setInternalTopicManager(internalTopicManager); + final MockInternalTopicManager internalTopicManager = overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -1867,6 +1868,39 @@ public void shouldRankPreviousClientAboveEquallyCaughtUpClient() { assertThat(clientRanking, equalTo(expectedClientRanking)); } + @Test + public void shouldRankTaskWithUnknownOffsetSumBelowCaughtUpClientAndClientWithLargeLag() { + final ClientState client1 = EasyMock.createMock(ClientState.class); + final ClientState client2 = EasyMock.createMock(ClientState.class); + final ClientState client3 = EasyMock.createMock(ClientState.class); + + expect(client1.lagFor(task0_0)).andReturn(UNKNOWN_OFFSET_SUM); + expect(client2.lagFor(task0_0)).andReturn(50L); + expect(client3.lagFor(task0_0)).andReturn(500L); + + final SortedSet> expectedClientRanking = mkSortedSet( + new RankedClient<>(uuid2, 0L), + new RankedClient<>(uuid1, 1L), + new RankedClient<>(uuid3, 500L) + ); + + replay(client1, client2, client3); + + final Map states = mkMap( + mkEntry(uuid1, client1), + mkEntry(uuid2, client2), + mkEntry(uuid3, client3) + ); + + final Map>> statefulTasksToRankedCandidates = + buildClientRankingsByTask(singleton(task0_0), states, ACCEPTABLE_RECOVERY_LAG); + + final SortedSet> clientRanking = statefulTasksToRankedCandidates.get(task0_0); + + EasyMock.verify(client1, client2, client3); + assertThat(clientRanking, equalTo(expectedClientRanking)); + } + @Test public void shouldRankAllClientsWithinAcceptableRecoveryLagWithRank0() { final ClientState client1 = EasyMock.createMock(ClientState.class); @@ -1925,6 +1959,57 @@ public void shouldRankNotCaughtUpClientsAccordingToLag() { assertThat(statefulTasksToRankedCandidates.get(task0_0), equalTo(expectedClientRanking)); } + @Test + public void shouldThrowIllegalStateExceptionIfAnyPartitionsMissingFromChangelogEndOffsets() { + final int changelogNumPartitions = 3; + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); + + createMockAdminClient(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(changelogNumPartitions - 1)) + ); + + configureDefault(); + overwriteInternalTopicManagerWithMock(); + + subscriptions.put("consumer10", + new Subscription( + singletonList("topic1"), + defaultSubscriptionInfo.encode() + )); + assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); + } + + @Test + public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOffsets() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store2", false), "processor1"); + + createMockAdminClient(getTopicPartitionOffsetsMap( + singletonList(APPLICATION_ID + "-store1-changelog"), + singletonList(3)) + ); + + configureDefault(); + overwriteInternalTopicManagerWithMock(); + + subscriptions.put("consumer10", + new Subscription( + singletonList("topic1"), + defaultSubscriptionInfo.encode() + )); + assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); + } + + @Test + public void shouldReturnPreviousAssignmentIfEndOffsetFetchFails() { + //TODO-soph + } + private static ByteBuffer encodeFutureSubscription() { final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */); buf.putInt(LATEST_SUPPORTED_VERSION + 1); @@ -2013,6 +2098,29 @@ private static void assertEquivalentAssignment(final Map> t } } + /** + * Helper for building the input to createMockAdminClient in cases where we don't care about the actual offsets + * @param changelogTopics The names of all changelog topics in the topology + * @param topicsNumPartitions The number of partitions for the corresponding changelog topic, such that the number + * of partitions of the ith topic in changelogTopics is given by the ith element of topicsNumPartitions + */ + private static Map getTopicPartitionOffsetsMap(final List changelogTopics, + final List topicsNumPartitions) { + if (changelogTopics.size() != topicsNumPartitions.size()) { + throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + + topicsNumPartitions.size() + " different numPartitions for the topics"); + } + final Map changelogEndOffsets = new HashMap<>(); + for (int i = 0; i < changelogTopics.size(); ++i) { + final String topic = changelogTopics.get(i); + final int numPartitions = topicsNumPartitions.get(i); + for (int partition = 0; partition < numPartitions; ++partition) { + changelogEndOffsets.put(new TopicPartition(topic, partition), 0L); + } + } + return changelogEndOffsets; + } + private static SubscriptionInfo getInfo(final UUID processId, final Set prevTasks, final Set standbyTasks) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index 6d3c5e776e1e0..170125b2d2682 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -27,6 +27,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -222,6 +223,15 @@ public void shouldReturnLatestOffsetForLagOfPreviousActiveRunningTask() { assertThat(client.lagFor(taskId01), equalTo(Task.LATEST_OFFSET)); } + @Test + public void shouldReturnUnknownOffsetSumForLagOfTaskWithUnknownOffset() { + final Map taskOffsetSums = Collections.singletonMap(taskId01, UNKNOWN_OFFSET_SUM); + final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); + client.addPreviousTasksAndOffsetSums(taskOffsetSums); + client.computeTaskLags(allTaskEndOffsetSums); + assertThat(client.lagFor(taskId01), equalTo(UNKNOWN_OFFSET_SUM)); + } + @Test public void shouldThrowIllegalStateExceptionIfOffsetSumIsGreaterThanEndOffsetSum() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); From e67aaef709572c7320869e38e23fac0fef87e354 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 19 Mar 2020 16:23:39 -0700 Subject: [PATCH 3/7] can't resist further assignor test cleanup --- .../StreamsPartitionAssignorTest.java | 44 ++++++------------- 1 file changed, 13 insertions(+), 31 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 20b43cd948f3e..d6938ccf28333 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -79,7 +79,6 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; @@ -211,24 +210,26 @@ private Map configProps() { return configurationMap; } - private void configureDefault() { + private MockInternalTopicManager configureDefault() { createDefaultMockTaskManager(); - configureDefaultPartitionAssignor(); + return configureDefaultPartitionAssignor(); } // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor - private void configureDefaultPartitionAssignor() { - configurePartitionAssignorWith(emptyMap()); + private MockInternalTopicManager configureDefaultPartitionAssignor() { + return configurePartitionAssignorWith(emptyMap()); } // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor - private void configurePartitionAssignorWith(final Map props) { + private MockInternalTopicManager configurePartitionAssignorWith(final Map props) { final Map configMap = configProps(); configMap.putAll(props); streamsConfig = new StreamsConfig(configMap); partitionAssignor.configure(configMap); EasyMock.replay(taskManager, adminClient); + + return overwriteInternalTopicManagerWithMock(); } private void createDefaultMockTaskManager() { @@ -251,8 +252,8 @@ private void createMockTaskManager(final Map taskOffsetSums, builder.buildTopology(); } - // If you don't care about settings the end offsets for each specific topic partition, the helper method - // getTopicPartitionOffsetMap is useful for building this input map based on the topics and number of partitions + // If you don't care about setting the end offsets for each specific topic partition, the helper method + // getTopicPartitionOffsetMap is useful for building this input map for all partitions private void createMockAdminClient(final Map changelogEndOffsets) { adminClient = EasyMock.createMock(AdminClient.class); @@ -532,7 +533,6 @@ public void testAssignBasic() { createMockTaskManager(prevTasks10, standbyTasks10); configureDefaultPartitionAssignor(); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -619,7 +619,6 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { final TaskId taskIdB3 = new TaskId(1, 3); configureDefault(); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -669,7 +668,6 @@ public void testAssignWithPartialTopology() { singletonList(3)) ); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class)); - overwriteInternalTopicManagerWithMock(); // will throw exception if it fails subscriptions.put("consumer10", @@ -758,7 +756,6 @@ public void testAssignWithNewTasks() { createMockTaskManager(prevTasks10, emptyTasks); configureDefaultPartitionAssignor(); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -823,7 +820,6 @@ public void testAssignWithStates() { asList(3, 3, 3)) ); configureDefault(); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription(topics, defaultSubscriptionInfo.encode())); @@ -890,7 +886,6 @@ public void testAssignWithStandbyReplicasAndStatelessTasks() { createMockTaskManager(mkSet(task0_0), emptySet()); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -921,7 +916,6 @@ public void testAssignWithStandbyReplicasAndLoggingDisabled() { createMockTaskManager(mkSet(task0_0), emptySet()); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -970,7 +964,6 @@ public void testAssignWithStandbyReplicas() { singletonList(3)) ); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -1087,8 +1080,7 @@ public void testAssignWithInternalTopics() { final List topics = asList("topic1", APPLICATION_ID + "-topicX"); final Set allTasks = mkSet(task0_0, task0_1, task0_2); - configureDefault(); - final MockInternalTopicManager internalTopicManager = overwriteInternalTopicManagerWithMock(); + final MockInternalTopicManager internalTopicManager = configureDefault(); subscriptions.put("consumer10", new Subscription( @@ -1116,8 +1108,7 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { final List topics = asList("topic1", APPLICATION_ID + "-topicX", APPLICATION_ID + "-topicZ"); final Set allTasks = mkSet(task0_0, task0_1, task0_2); - configureDefault(); - final MockInternalTopicManager internalTopicManager = overwriteInternalTopicManagerWithMock(); + final MockInternalTopicManager internalTopicManager = configureDefault(); subscriptions.put("consumer10", new Subscription( @@ -1164,8 +1155,7 @@ public void shouldGenerateTasksForAllCreatedPartitions() { asList(4, 4)) ); - configureDefault(); - final MockInternalTopicManager mockInternalTopicManager = overwriteInternalTopicManagerWithMock(); + final MockInternalTopicManager mockInternalTopicManager = configureDefault(); subscriptions.put(client, new Subscription( @@ -1233,7 +1223,6 @@ public void shouldMapUserEndPointToTopicPartitions() { createDefaultMockTaskManager(); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, USER_END_POINT)); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer1", new Subscription( @@ -1309,8 +1298,7 @@ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTas builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()); - configureDefault(); - final MockInternalTopicManager mockInternalTopicManager = overwriteInternalTopicManagerWithMock(); + final MockInternalTopicManager mockInternalTopicManager = configureDefault(); subscriptions.put(client, new Subscription( @@ -1395,7 +1383,6 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, USER_END_POINT); configurePartitionAssignorWith(props); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer1", new Subscription( @@ -1643,7 +1630,6 @@ public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionPro ); configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer1", new Subscription( @@ -1800,8 +1786,6 @@ public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() { configureDefault(); - final MockInternalTopicManager internalTopicManager = overwriteInternalTopicManagerWithMock(); - subscriptions.put("consumer10", new Subscription( topics, @@ -1972,7 +1956,6 @@ public void shouldThrowIllegalStateExceptionIfAnyPartitionsMissingFromChangelogE ); configureDefault(); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( @@ -1995,7 +1978,6 @@ public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf ); configureDefault(); - overwriteInternalTopicManagerWithMock(); subscriptions.put("consumer10", new Subscription( From 5a5f53faff235167d05c18b7ceb06cbf12b44af5 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 19 Mar 2020 16:51:45 -0700 Subject: [PATCH 4/7] don't violate active task stickiness if offset fetch fails and add tests --- .../internals/StreamsPartitionAssignor.java | 35 ++++----- .../assignment/AssignorConfiguration.java | 10 +-- .../internals/assignment/ClientState.java | 75 ++++++++++--------- .../assignment/StickyTaskAssignor.java | 19 ++++- .../internals/assignment/TaskAssignor.java | 2 +- .../StreamsPartitionAssignorTest.java | 37 ++++++++- .../internals/assignment/ClientStateTest.java | 32 ++++---- .../assignment/StickyTaskAssignorTest.java | 13 ++++ 8 files changed, 144 insertions(+), 79 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 1f6277689d6df..c6b84581474ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -402,7 +402,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr final Map> partitionsForTask = partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata); - assignTasksToClients(allSourceTopics, partitionsForTask, topicGroups, clientMetadataMap, fullMetadata); // ---------------- Step Three ---------------- // @@ -747,21 +746,23 @@ private void assignTasksToClients(final Set allSourceTopics, populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask); // assign tasks to clients + final Set allTasks = partitionsForTask.keySet(); + final Set standbyTasks = changelogsByStatefulTask.keySet(); + if (lagComputationSuccessful) { final Map>> statefulTasksToRankedCandidates = - buildClientRankingsByTask(changelogsByStatefulTask.keySet(), clientStates, assignmentConfigs.acceptableRecoveryLag); + buildClientRankingsByTask(standbyTasks, clientStates, acceptableRecoveryLag()); + log.trace("Computed statefulTasksToRankedCandidates map as {}", statefulTasksToRankedCandidates); + } - log.debug("Assigning tasks {} to clients {} with number of replicas {}", - partitionsForTask.keySet(), clientStates, assignmentConfigs.numStandbyReplicas); + log.debug("Assigning tasks {} to clients {} with number of replicas {}", + allTasks, clientStates, numStandbyReplicas()); - final StickyTaskAssignor taskAssignor = - new StickyTaskAssignor<>(clientStates, partitionsForTask.keySet(), statefulTasksToRankedCandidates.keySet()); - taskAssignor.assign(assignmentConfigs.numStandbyReplicas); - } else { - log.debug("Failed to fetch end offsets and compute task lags, will return tasks to previous owners then retry"); - // give tasks back to previous owners (based on prevActiveTasks and prevStandbyTasks), distribute any "unowned" tasks - //TODO-soph + final StickyTaskAssignor taskAssignor = new StickyTaskAssignor<>(clientStates, allTasks, standbyTasks); + if (!lagComputationSuccessful) { + taskAssignor.preservePreviousTaskAssignment(); } + taskAssignor.assign(numStandbyReplicas()); log.info("Assigned tasks to clients as {}{}.", Utils.NL, clientStates.entrySet().stream().map(Map.Entry::toString).collect(Collectors.joining(Utils.NL))); @@ -790,11 +791,11 @@ private boolean populateClientStatesMap(final Map clientState .collect(Collectors.toList()); final Map endOffsets = fetchEndOffsets(allChangelogPartitions, adminClient); allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask); - fetchEndOffsetsSuccessful = true; } catch (final StreamsException e) { allTaskEndOffsetSums = null; fetchEndOffsetsSuccessful = false; + setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); } for (final Map.Entry entry : clientMetadataMap.entrySet()) { @@ -1601,23 +1602,23 @@ protected void handleRebalanceStart(final Set topics) { taskManager.handleRebalanceStart(topics); } - long acceptableRecoveryLag() { + Long acceptableRecoveryLag() { return assignmentConfigs.acceptableRecoveryLag; } - int balanceFactor() { + Integer balanceFactor() { return assignmentConfigs.balanceFactor; } - int maxWarmupReplicas() { + Integer maxWarmupReplicas() { return assignmentConfigs.maxWarmupReplicas; } - int numStandbyReplicas() { + Integer numStandbyReplicas() { return assignmentConfigs.numStandbyReplicas; } - long probingRebalanceIntervalMs() { + Long probingRebalanceIntervalMs() { return assignmentConfigs.probingRebalanceIntervalMs; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 66f528c4b1fd2..726a04919f344 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -269,11 +269,11 @@ public AssignmentConfigs getAssignmentConfigs() { } public static class AssignmentConfigs { - public final long acceptableRecoveryLag; - public final int balanceFactor; - public final int maxWarmupReplicas; - public final int numStandbyReplicas; - public final long probingRebalanceIntervalMs; + public final Long acceptableRecoveryLag; + public final Integer balanceFactor; + public final Integer maxWarmupReplicas; + public final Integer numStandbyReplicas; + public final Long probingRebalanceIntervalMs; AssignmentConfigs(final StreamsConfig configs) { acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 7a84f921a8cb2..9a522edddabfa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -95,14 +95,24 @@ public ClientState copy() { capacity); } - public void assign(final TaskId taskId, final boolean active) { - if (active) { - activeTasks.add(taskId); - } else { - standbyTasks.add(taskId); - } + void assignActive(final TaskId task) { + activeTasks.add(task); + assignedTasks.add(task); + } + + void assignStandby(final TaskId task) { + standbyTasks.add(task); + assignedTasks.add(task); + } - assignedTasks.add(taskId); + public void assignActiveTasks(final Collection tasks) { + activeTasks.addAll(tasks); + assignedTasks.addAll(tasks); + } + + void assignStandbyTasks(final Collection tasks) { + standbyTasks.addAll(tasks); + assignedTasks.addAll(tasks); } public Set activeTasks() { @@ -113,7 +123,7 @@ public Set standbyTasks() { return standbyTasks; } - public Set prevActiveTasks() { + Set prevActiveTasks() { return prevActiveTasks; } @@ -142,6 +152,7 @@ public int activeTaskCount() { public void addPreviousActiveTasks(final Set prevTasks) { prevActiveTasks.addAll(prevTasks); prevAssignedTasks.addAll(prevTasks); + prevStandbyTasks.removeAll(prevTasks); } void addPreviousStandbyTasks(final Set standbyTasks) { @@ -219,24 +230,18 @@ public void removeFromAssignment(final TaskId task) { assignedTasks.remove(task); } - @Override - public String toString() { - return "[activeTasks: (" + activeTasks + - ") standbyTasks: (" + standbyTasks + - ") assignedTasks: (" + assignedTasks + - ") prevActiveTasks: (" + prevActiveTasks + - ") prevStandbyTasks: (" + prevStandbyTasks + - ") prevAssignedTasks: (" + prevAssignedTasks + - ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + - ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + - ") capacity: " + capacity + - "]"; - } - boolean reachedCapacity() { return assignedTasks.size() >= capacity; } + int capacity() { + return capacity; + } + + boolean hasUnfulfilledQuota(final int tasksPerThread) { + return activeTasks.size() < capacity * tasksPerThread; + } + boolean hasMoreAvailableCapacityThan(final ClientState other) { if (this.capacity <= 0) { throw new IllegalStateException("Capacity of this ClientState must be greater than 0."); @@ -262,6 +267,20 @@ boolean hasAssignedTask(final TaskId taskId) { return assignedTasks.contains(taskId); } + @Override + public String toString() { + return "[activeTasks: (" + activeTasks + + ") standbyTasks: (" + standbyTasks + + ") assignedTasks: (" + assignedTasks + + ") prevActiveTasks: (" + prevActiveTasks + + ") prevStandbyTasks: (" + prevStandbyTasks + + ") prevAssignedTasks: (" + prevAssignedTasks + + ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + + ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + + ") capacity: " + capacity + + "]"; + } + // Visible for testing Set assignedTasks() { return assignedTasks; @@ -271,16 +290,4 @@ Set previousAssignedTasks() { return prevAssignedTasks; } - int capacity() { - return capacity; - } - - boolean hasUnfulfilledQuota(final int tasksPerThread) { - return activeTasks.size() < capacity * tasksPerThread; - } - - // the following methods are used for testing only - public void assignActiveTasks(final Collection tasks) { - activeTasks.addAll(tasks); - } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index 64ff5e6c90102..f40ff8514b81d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -31,7 +31,7 @@ import java.util.Objects; import java.util.Set; -public class StickyTaskAssignor implements TaskAssignor { +public class StickyTaskAssignor implements TaskAssignor { private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class); private final Map clients; @@ -41,12 +41,15 @@ public class StickyTaskAssignor implements TaskAssignor { private final Map> previousStandbyTaskAssignment = new HashMap<>(); private final TaskPairs taskPairs; + private boolean mustPreserveActiveTaskAssignment; + public StickyTaskAssignor(final Map clients, final Set allTaskIds, final Set standbyTaskIds) { this.clients = clients; this.allTaskIds = allTaskIds; this.standbyTaskIds = standbyTaskIds; + this.mustPreserveActiveTaskAssignment = false; final int maxPairs = allTaskIds.size() * (allTaskIds.size() - 1) / 2; taskPairs = new TaskPairs(maxPairs); @@ -59,6 +62,10 @@ public void assign(final int numStandbyReplicas) { assignStandby(numStandbyReplicas); } + public void preservePreviousTaskAssignment() { + mustPreserveActiveTaskAssignment = true; + } + private void assignStandby(final int numStandbyReplicas) { for (final TaskId taskId : standbyTaskIds) { for (int i = 0; i < numStandbyReplicas; i++) { @@ -88,7 +95,7 @@ private void assignActive() { final TaskId taskId = entry.getKey(); if (allTaskIds.contains(taskId)) { final ClientState client = clients.get(entry.getValue()); - if (client.hasUnfulfilledQuota(tasksPerThread)) { + if (mustPreserveActiveTaskAssignment || client.hasUnfulfilledQuota(tasksPerThread)) { assignTaskToClient(assigned, taskId, client); } } @@ -125,12 +132,16 @@ private void assignActive() { private void allocateTaskWithClientCandidates(final TaskId taskId, final Set clientsWithin, final boolean active) { final ClientState client = findClient(taskId, clientsWithin); taskPairs.addPairs(taskId, client.assignedTasks()); - client.assign(taskId, active); + if (active) { + client.assignActive(taskId); + } else { + client.assignStandby(taskId); + } } private void assignTaskToClient(final Set assigned, final TaskId taskId, final ClientState client) { taskPairs.addPairs(taskId, client.assignedTasks()); - client.assign(taskId, true); + client.assignActive(taskId); assigned.add(taskId); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java index 162add0bd4382..679e416014e17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java @@ -16,6 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -public interface TaskAssignor> { +public interface TaskAssignor { void assign(int numStandbyReplicas); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index d6938ccf28333..cc40ad7501687 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -1988,8 +1989,40 @@ public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf } @Test - public void shouldReturnPreviousAssignmentIfEndOffsetFetchFails() { - //TODO-soph + public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceIfEndOffsetFetchFails() { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); + final Set allTasks = mkSet(task0_0, task0_1, task0_2); + + createMockTaskManager(allTasks, emptyTasks); + adminClient = EasyMock.createMock(AdminClient.class); + expect(adminClient.listOffsets(anyObject())).andThrow(new StreamsException("Should be handled")); + configureDefaultPartitionAssignor(); + + final String firstConsumer = "consumer1"; + final String newConsumer = "consumer2"; + + subscriptions.put(firstConsumer, + new Subscription( + singletonList("source1"), + getInfo(uuid1, allTasks, emptyTasks).encode() + )); + subscriptions.put(newConsumer, + new Subscription( + singletonList("source1"), + getInfo(uuid2, emptyTasks, emptyTasks).encode() + )); + + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + + final List firstConsumerActiveTasks = + AssignmentInfo.decode(assignments.get(firstConsumer).userData()).activeTasks(); + final List newConsumerActiveTasks = + AssignmentInfo.decode(assignments.get(newConsumer).userData()).activeTasks(); + + assertThat(firstConsumerActiveTasks, equalTo(new ArrayList<>(allTasks))); + assertTrue(newConsumerActiveTasks.isEmpty()); } private static ByteBuffer encodeFutureSubscription() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index 170125b2d2682..f08ae1ae28e95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -49,13 +49,13 @@ public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() { @Test public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity() { - client.assign(taskId01, true); + client.assignActive(taskId01); assertTrue(client.reachedCapacity()); } @Test public void shouldAddActiveTasksToBothAssignedAndActive() { - client.assign(taskId01, true); + client.assignActive(taskId01); assertThat(client.activeTasks(), equalTo(Collections.singleton(taskId01))); assertThat(client.assignedTasks(), equalTo(Collections.singleton(taskId01))); assertThat(client.assignedTaskCount(), equalTo(1)); @@ -63,8 +63,8 @@ public void shouldAddActiveTasksToBothAssignedAndActive() { } @Test - public void shouldAddStandbyTasksToBothStandbyAndActive() { - client.assign(taskId01, false); + public void shouldAddStandbyTasksToBothStandbyAndAssigned() { + client.assignStandby(taskId01); assertThat(client.assignedTasks(), equalTo(Collections.singleton(taskId01))); assertThat(client.standbyTasks(), equalTo(Collections.singleton(taskId01))); assertThat(client.assignedTaskCount(), equalTo(1)); @@ -79,7 +79,7 @@ public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() { } @Test - public void shouldAddPreviousStandbyTasksToPreviousAssigned() { + public void shouldAddPreviousStandbyTasksToPreviousAssignedAndPreviousStandby() { client.addPreviousStandbyTasks(Utils.mkSet(taskId01, taskId02)); assertThat(client.prevActiveTasks().size(), equalTo(0)); assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(taskId01, taskId02))); @@ -87,26 +87,26 @@ public void shouldAddPreviousStandbyTasksToPreviousAssigned() { @Test public void shouldHaveAssignedTaskIfActiveTaskAssigned() { - client.assign(taskId01, true); + client.assignActive(taskId01); assertTrue(client.hasAssignedTask(taskId01)); } @Test public void shouldHaveAssignedTaskIfStandbyTaskAssigned() { - client.assign(taskId01, false); + client.assignStandby(taskId01); assertTrue(client.hasAssignedTask(taskId01)); } @Test public void shouldNotHaveAssignedTaskIfTaskNotAssigned() { - client.assign(taskId01, true); + client.assignActive(taskId01); assertFalse(client.hasAssignedTask(taskId02)); } @Test public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks() { final ClientState otherClient = new ClientState(1); - client.assign(taskId01, true); + client.assignActive(taskId01); assertTrue(otherClient.hasMoreAvailableCapacityThan(client)); assertFalse(client.hasMoreAvailableCapacityThan(otherClient)); } @@ -123,11 +123,11 @@ public void shouldUseMultiplesOfCapacityToDetermineClientWithMoreAvailableCapaci final ClientState otherClient = new ClientState(2); for (int i = 0; i < 7; i++) { - otherClient.assign(new TaskId(0, i), true); + otherClient.assignActive(new TaskId(0, i)); } for (int i = 7; i < 11; i++) { - client.assign(new TaskId(0, i), true); + client.assignActive(new TaskId(0, i)); } assertTrue(otherClient.hasMoreAvailableCapacityThan(client)); @@ -138,10 +138,10 @@ public void shouldHaveMoreAvailableCapacityWhenCapacityIsTheSameButAssignedTasks final ClientState client = new ClientState(3); final ClientState otherClient = new ClientState(3); for (int i = 0; i < 4; i++) { - client.assign(new TaskId(0, i), true); - otherClient.assign(new TaskId(0, i), true); + client.assignActive(new TaskId(0, i)); + otherClient.assignActive(new TaskId(0, i)); } - otherClient.assign(new TaskId(0, 5), true); + otherClient.assignActive(new TaskId(0, 5)); assertTrue(client.hasMoreAvailableCapacityThan(otherClient)); } @@ -157,13 +157,13 @@ public void shouldThrowIllegalStateExceptionIfCapacityOfOtherClientStateIsZero() @Test public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread() { - client.assign(new TaskId(0, 1), true); + client.assignActive(new TaskId(0, 1)); assertTrue(client.hasUnfulfilledQuota(2)); } @Test public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread() { - client.assign(new TaskId(0, 1), true); + client.assignActive(new TaskId(0, 1)); assertFalse(client.hasUnfulfilledQuota(1)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 3fb284ad4a113..47254bb21625f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -674,6 +674,19 @@ public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExisting assertThat(newClient.activeTaskCount(), equalTo(2)); } + @Test + public void shouldViolateBalanceToPreserveActiveTaskStickiness() { + final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02); + final ClientState c2 = createClient(p2, 1); + + final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); + taskAssignor.preservePreviousTaskAssignment(); + taskAssignor.assign(0); + + assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task00, task01, task02))); + assertTrue(c2.activeTasks().isEmpty()); + } + private StickyTaskAssignor createTaskAssignor(final TaskId... tasks) { final List taskIds = Arrays.asList(tasks); Collections.shuffle(taskIds); From 3cf72e06965a476424cd40f213958c67306f66ea Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 19 Mar 2020 21:03:44 -0700 Subject: [PATCH 5/7] timeout admin client request --- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/common/utils/Utils.java | 2 -- .../apache/kafka/streams/KafkaStreams.java | 27 ++++++++++++++----- .../internals/StreamsPartitionAssignor.java | 6 ++++- .../assignment/AssignorConfiguration.java | 7 +++++ .../kafka/streams/KafkaStreamsTest.java | 23 +++++++++++++--- .../StreamsPartitionAssignorTest.java | 13 +++++++++ 7 files changed, 67 insertions(+), 13 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 4a062627e7fdb..2f3186ce67147 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -12,7 +12,7 @@ + files="(MessageDataGenerator|FieldSpec|AssignorConfiguration).java"/> > allLocalStorePartitionLags() { } log.debug("Current changelog positions: {}", allChangelogPositions); - final Map allEndOffsets = fetchEndOffsets(allPartitions, adminClient); + final Map allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient); log.debug("Current end offsets :{}", allEndOffsets); for (final Map.Entry entry : allEndOffsets.entrySet()) { @@ -1237,14 +1239,27 @@ public Map> allLocalStorePartitionLags() { return Collections.unmodifiableMap(localStorePartitionLags); } + static Map fetchEndOffsetsWithoutTimeout(final Collection partitions, + final Admin adminClient) { + return fetchEndOffsets(partitions, adminClient, null); + } + public static Map fetchEndOffsets(final Collection partitions, - final Admin adminClient) { + final Admin adminClient, + final Duration timeout) { + final Map endOffsets; try { - return adminClient.listOffsets( - partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())) - ).all().get(); - } catch (final RuntimeException | InterruptedException | ExecutionException e) { + final KafkaFuture> future = adminClient.listOffsets( + partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) + .all(); + if (timeout == null) { + endOffsets = future.get(); + } else { + endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) { throw new StreamsException("Unable to obtain end offsets from kafka", e); } + return endOffsets; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index c6b84581474ad..02d0cebbca4bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.time.Duration; import java.util.Objects; import java.util.SortedSet; import java.util.TreeSet; @@ -210,6 +211,7 @@ public int hashCode() { protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION; private Admin adminClient; + private int adminClientTimeout; private InternalTopicManager internalTopicManager; private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer; private RebalanceProtocol rebalanceProtocol; @@ -236,6 +238,7 @@ public void configure(final Map configs) { partitionGrouper = assignorConfiguration.getPartitionGrouper(); userEndPoint = assignorConfiguration.getUserEndPoint(); adminClient = assignorConfiguration.getAdminClient(); + adminClientTimeout = assignorConfiguration.getAdminClientTimeout(); internalTopicManager = assignorConfiguration.getInternalTopicManager(); copartitionedTopicsEnforcer = assignorConfiguration.getCopartitionedTopicsEnforcer(); rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); @@ -789,7 +792,8 @@ private boolean populateClientStatesMap(final Map clientState final Collection allChangelogPartitions = changelogsByStatefulTask.values().stream() .flatMap(Collection::stream) .collect(Collectors.toList()); - final Map endOffsets = fetchEndOffsets(allChangelogPartitions, adminClient); + final Map endOffsets = + fetchEndOffsets(allChangelogPartitions, adminClient, Duration.ofMillis(adminClientTimeout)); allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask); fetchEndOffsetsSuccessful = true; } catch (final StreamsException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 726a04919f344..ad16d0ac8acad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigException; @@ -46,6 +47,7 @@ public final class AssignorConfiguration { private final TaskManager taskManager; private final StreamsMetadataState streamsMetadataState; private final Admin adminClient; + private final int adminClientTimeout; private final InternalTopicManager internalTopicManager; private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer; private final StreamsConfig streamsConfig; @@ -149,6 +151,7 @@ public AssignorConfiguration(final Map configs) { internalTopicManager = new InternalTopicManager(adminClient, streamsConfig); } + adminClientTimeout = streamsConfig.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG); copartitionedTopicsEnforcer = new CopartitionedTopicsEnforcer(logPrefix); } @@ -256,6 +259,10 @@ public Admin getAdminClient() { return adminClient; } + public int getAdminClientTimeout() { + return adminClientTimeout; + } + public InternalTopicManager getInternalTopicManager() { return internalTopicManager; } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 72dddc7f4c14d..bc1ca1dab8520 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult; @@ -88,6 +89,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsets; +import static org.apache.kafka.streams.KafkaStreams.fetchEndOffsetsWithoutTimeout; import static org.easymock.EasyMock.anyInt; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; @@ -896,7 +898,7 @@ public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() { final Admin adminClient = EasyMock.createMock(AdminClient.class); EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException()); replay(adminClient); - assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient)); + assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); verify(adminClient); } @@ -911,7 +913,7 @@ public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() EasyMock.expect(allFuture.get()).andThrow(new InterruptedException()); replay(adminClient, result, allFuture); - assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient)); + assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); verify(adminClient); } @@ -926,7 +928,22 @@ public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() t EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException())); replay(adminClient, result, allFuture); - assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient)); + assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); + verify(adminClient); + } + + @Test + public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException() throws InterruptedException, ExecutionException, TimeoutException { + final Admin adminClient = EasyMock.createMock(AdminClient.class); + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFuture> allFuture = EasyMock.createMock(KafkaFuture.class); + + EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); + EasyMock.expect(result.all()).andStubReturn(allFuture); + EasyMock.expect(allFuture.get(1L, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException()); + replay(adminClient, result, allFuture); + + assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient, Duration.ofMillis(1))); verify(adminClient); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index cc40ad7501687..418e13f70b476 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -20,6 +20,7 @@ import java.util.SortedSet; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; @@ -47,6 +48,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.RankedClient; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ClientState; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; @@ -1824,6 +1826,17 @@ public void shouldGetAssignmentConfigs() { assertThat(partitionAssignor.probingRebalanceIntervalMs(), equalTo(55 * 60 * 1000L)); } + @Test + public void shouldSetAdminClientTimeout() { + createDefaultMockTaskManager(); + + final Map props = configProps(); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 2 * 60 * 1000); + final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props); + + assertThat(assignorConfiguration.getAdminClientTimeout(), is(2 * 60 * 1000)); + } + @Test public void shouldRankPreviousClientAboveEquallyCaughtUpClient() { final ClientState client1 = EasyMock.createMock(ClientState.class); From bb5ea3000bb0e02413da92f2a7cf162809e4d31b Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Fri, 20 Mar 2020 15:46:07 -0700 Subject: [PATCH 6/7] remove extra space --- clients/src/main/java/org/apache/kafka/clients/admin/Admin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index f10679bd4dc63..b99fd5ff58c39 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1119,7 +1119,7 @@ default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up. * @return The ListOffsetsResult. */ - default ListOffsetsResult listOffsets(Map topicPartitionOffsets) { + default ListOffsetsResult listOffsets(Map topicPartitionOffsets) { return listOffsets(topicPartitionOffsets, new ListOffsetsOptions()); } From e3dc2a52378efe7bb111982c725760bb4e7a1e9d Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Fri, 20 Mar 2020 16:32:04 -0700 Subject: [PATCH 7/7] unbox assignment configs --- .../processor/internals/StreamsPartitionAssignor.java | 10 +++++----- .../internals/assignment/AssignorConfiguration.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 02d0cebbca4bd..2d1d3de8c0899 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1606,23 +1606,23 @@ protected void handleRebalanceStart(final Set topics) { taskManager.handleRebalanceStart(topics); } - Long acceptableRecoveryLag() { + long acceptableRecoveryLag() { return assignmentConfigs.acceptableRecoveryLag; } - Integer balanceFactor() { + int balanceFactor() { return assignmentConfigs.balanceFactor; } - Integer maxWarmupReplicas() { + int maxWarmupReplicas() { return assignmentConfigs.maxWarmupReplicas; } - Integer numStandbyReplicas() { + int numStandbyReplicas() { return assignmentConfigs.numStandbyReplicas; } - Long probingRebalanceIntervalMs() { + long probingRebalanceIntervalMs() { return assignmentConfigs.probingRebalanceIntervalMs; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index ad16d0ac8acad..9086479c62265 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -276,11 +276,11 @@ public AssignmentConfigs getAssignmentConfigs() { } public static class AssignmentConfigs { - public final Long acceptableRecoveryLag; - public final Integer balanceFactor; - public final Integer maxWarmupReplicas; - public final Integer numStandbyReplicas; - public final Long probingRebalanceIntervalMs; + public final long acceptableRecoveryLag; + public final int balanceFactor; + public final int maxWarmupReplicas; + public final int numStandbyReplicas; + public final long probingRebalanceIntervalMs; AssignmentConfigs(final StreamsConfig configs) { acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);