From 2be7ae9bdc4447ea9c805570cc149a5bf15631a0 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 5 Nov 2015 20:09:16 -0800 Subject: [PATCH] HOTFIX: unsubscribe does not clear user assignment properly --- .../consumer/internals/SubscriptionState.java | 1 + .../internals/SubscriptionStateTest.java | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index a9ff35f0087cc..a142196230505 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -185,6 +185,7 @@ public boolean hasPatternSubscription() { public void unsubscribe() { this.subscription.clear(); + this.userAssignment.clear(); this.assignment.clear(); this.needsPartitionAssignment = true; this.subscribedPattern = null; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index c5fce616070b7..6566025fe30a4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -176,6 +176,22 @@ public void patternSubscription() { "Expected subscribed topics count is incorrect", 2, state.subscription().size()); } + @Test + public void unsubscribeUserAssignment() { + state.assignFromUser(Arrays.asList(tp0, tp1)); + state.unsubscribe(); + state.subscribe(Arrays.asList(topic), rebalanceListener); + assertEquals(Collections.singleton(topic), state.subscription()); + } + + @Test + public void unsubscribeUserSubscribe() { + state.subscribe(Arrays.asList(topic), rebalanceListener); + state.unsubscribe(); + state.assignFromUser(Arrays.asList(tp0)); + assertEquals(Collections.singleton(tp0), state.assignedPartitions()); + } + @Test public void unsubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener);