From 354c3bf384c7f6e64d1659fbdf9197821f8239f9 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 31 Oct 2019 17:51:31 -0700 Subject: [PATCH 1/2] cherry-pick from trunk --- .../clients/consumer/internals/SubscriptionState.java | 9 +++++++-- .../consumer/internals/SubscriptionStateTest.java | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 450467420588d..ae15f2feab7d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -618,8 +618,13 @@ public synchronized void resume(TopicPartition tp) { } synchronized void requestFailed(Set partitions, long nextRetryTimeMs) { - for (TopicPartition partition : partitions) - assignedState(partition).requestFailed(nextRetryTimeMs); + for (TopicPartition partition : partitions) { + // by the time the request failed, the assignment may no longer + // contain this partition any more, in which case we would just ignore. + final TopicPartitionState state = assignedStateOrNull(partition); + if (state != null) + state.requestFailed(nextRetryTimeMs); + } } synchronized void movePartitionToEnd(TopicPartition tp) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 484b9de0a9bce..217b3ce680c73 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -62,7 +62,7 @@ public void partitionAssignment() { state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); assertEquals(1L, state.position(tp0).offset); - state.assignFromUser(Collections.emptySet()); + state.assignFromUser(Collections.emptySet()); assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); assertFalse(state.isAssigned(tp0)); From 59ef902f8157b1625c66353dbbb8b4657acb42de Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 2 Nov 2019 10:47:09 -0700 Subject: [PATCH 2/2] add another bug fix --- .../clients/consumer/internals/ConsumerCoordinator.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 7341826d515e7..876a51d605dab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1033,8 +1033,11 @@ private static class MetadataSnapshot { private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) { Map partitionsPerTopic = new HashMap<>(); - for (String topic : subscription.groupSubscription()) - partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic)); + for (String topic : subscription.groupSubscription()) { + Integer numPartitions = cluster.partitionCountForTopic(topic); + if (numPartitions != null) + partitionsPerTopic.put(topic, numPartitions); + } this.partitionsPerTopic = partitionsPerTopic; this.version = version; }