diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 7341826d515e7..876a51d605dab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1033,8 +1033,11 @@ private static class MetadataSnapshot { private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) { Map partitionsPerTopic = new HashMap<>(); - for (String topic : subscription.groupSubscription()) - partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic)); + for (String topic : subscription.groupSubscription()) { + Integer numPartitions = cluster.partitionCountForTopic(topic); + if (numPartitions != null) + partitionsPerTopic.put(topic, numPartitions); + } this.partitionsPerTopic = partitionsPerTopic; this.version = version; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 450467420588d..ae15f2feab7d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -618,8 +618,13 @@ public synchronized void resume(TopicPartition tp) { } synchronized void requestFailed(Set partitions, long nextRetryTimeMs) { - for (TopicPartition partition : partitions) - assignedState(partition).requestFailed(nextRetryTimeMs); + for (TopicPartition partition : partitions) { + // by the time the request failed, the assignment may no longer + // contain this partition any more, in which case we would just ignore. + final TopicPartitionState state = assignedStateOrNull(partition); + if (state != null) + state.requestFailed(nextRetryTimeMs); + } } synchronized void movePartitionToEnd(TopicPartition tp) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 484b9de0a9bce..217b3ce680c73 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -62,7 +62,7 @@ public void partitionAssignment() { state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); assertEquals(1L, state.position(tp0).offset); - state.assignFromUser(Collections.emptySet()); + state.assignFromUser(Collections.emptySet()); assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); assertFalse(state.isAssigned(tp0));