diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index a9ff35f0087cc..a142196230505 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -185,6 +185,7 @@ public boolean hasPatternSubscription() { public void unsubscribe() { this.subscription.clear(); + this.userAssignment.clear(); this.assignment.clear(); this.needsPartitionAssignment = true; this.subscribedPattern = null; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index c5fce616070b7..6566025fe30a4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -176,6 +176,22 @@ public void patternSubscription() { "Expected subscribed topics count is incorrect", 2, state.subscription().size()); } + @Test + public void unsubscribeUserAssignment() { + state.assignFromUser(Arrays.asList(tp0, tp1)); + state.unsubscribe(); + state.subscribe(Arrays.asList(topic), rebalanceListener); + assertEquals(Collections.singleton(topic), state.subscription()); + } + + @Test + public void unsubscribeUserSubscribe() { + state.subscribe(Arrays.asList(topic), rebalanceListener); + state.unsubscribe(); + state.assignFromUser(Arrays.asList(tp0)); + assertEquals(Collections.singleton(tp0), state.assignedPartitions()); + } + @Test public void unsubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener);