diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 3d94572f3856f..3f3c1233da006 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -393,7 +393,7 @@ private Map errorAssignment(final Map public Map assign(final Cluster metadata, final Map subscriptions) { // construct the client metadata from the decoded subscription info - final Map clientsMetadata = new HashMap<>(); + final Map clientMetadataMap = new HashMap<>(); final Set futureConsumers = new HashSet<>(); int minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; @@ -417,11 +417,11 @@ public Map assign(final Cluster metadata, } // create the new client metadata if necessary - ClientMetadata clientMetadata = clientsMetadata.get(info.processId()); + ClientMetadata clientMetadata = clientMetadataMap.get(info.processId()); if (clientMetadata == null) { clientMetadata = new ClientMetadata(info.userEndPoint()); - clientsMetadata.put(info.processId(), clientMetadata); + clientMetadataMap.put(info.processId(), clientMetadata); } // add the consumer to the client @@ -449,7 +449,7 @@ public Map assign(final Cluster metadata, SubscriptionInfo.LATEST_SUPPORTED_VERSION); } - log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata); + log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap); // ---------------- Step Zero ---------------- // @@ -465,7 +465,7 @@ public Map assign(final Cluster metadata, !metadata.topics().contains(topic)) { log.error("Missing source topic {} durign assignment. Returning error {}.", topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); - return errorAssignment(clientsMetadata, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code); + return errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code); } } for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { @@ -623,7 +623,7 @@ public Map assign(final Cluster metadata, // assign tasks to clients final Map states = new HashMap<>(); - for (final Map.Entry entry : clientsMetadata.entrySet()) { + for (final Map.Entry entry : clientMetadataMap.entrySet()) { states.put(entry.getKey(), entry.getValue().state); } @@ -640,7 +640,7 @@ public Map assign(final Cluster metadata, // construct the global partition assignment per host map final Map> partitionsByHostState = new HashMap<>(); if (minReceivedMetadataVersion >= 2) { - for (final Map.Entry entry : clientsMetadata.entrySet()) { + for (final Map.Entry entry : clientMetadataMap.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; if (hostInfo != null) { @@ -659,9 +659,9 @@ public Map assign(final Cluster metadata, final Map assignment; if (versionProbing) { - assignment = versionProbingAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, futureConsumers, minReceivedMetadataVersion); + assignment = versionProbingAssignment(clientMetadataMap, partitionsForTask, partitionsByHostState, futureConsumers, minReceivedMetadataVersion); } else { - assignment = computeNewAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion); + assignment = computeNewAssignment(clientMetadataMap, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion); } return assignment;