diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d3616f9cb83d7..89b2f0b6dbfeb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -760,7 +760,7 @@ public void unsubscribe() { try { log.debug("Unsubscribed all topics or patterns and assigned partitions"); this.subscriptions.unsubscribe(); - this.coordinator.maybeLeaveGroup(false); + this.coordinator.maybeLeaveGroup(); this.metadata.needMetadataForAllTopics(false); } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 5b5c8a52d9274..a12c6c1f90ccc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -540,18 +540,18 @@ protected void coordinatorDead() { @Override public void close() { client.disableWakeups(); - maybeLeaveGroup(true); + maybeLeaveGroup(); } /** * Leave the current group and reset local generation/memberId. */ - public void maybeLeaveGroup(boolean awaitResponse) { + public void maybeLeaveGroup() { client.unschedule(heartbeatTask); if (!coordinatorUnknown() && generation > 0) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. - sendLeaveGroupRequest(awaitResponse); + sendLeaveGroupRequest(); } this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; @@ -559,7 +559,7 @@ public void maybeLeaveGroup(boolean awaitResponse) { rejoinNeeded = true; } - private void sendLeaveGroupRequest(boolean awaitResponse) { + private void sendLeaveGroupRequest() { LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId); RequestFuture future = client.send(coordinator, ApiKeys.LEAVE_GROUP, request) .compose(new LeaveGroupResponseHandler()); @@ -574,10 +574,7 @@ public void onFailure(RuntimeException e) { } }); - if (awaitResponse) - client.poll(future); - else - client.poll(future, 0); + client.poll(future, 0); } private class LeaveGroupResponseHandler extends CoordinatorResponseHandler { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 86ac6b3723f78..500aaed914008 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -418,7 +418,7 @@ public boolean matches(ClientRequest request) { leaveRequest.groupId().equals(groupId); } }, new LeaveGroupResponse(Errors.NONE.code()).toStruct()); - coordinator.maybeLeaveGroup(false); + coordinator.maybeLeaveGroup(); assertTrue(received.get()); assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId); assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation); @@ -672,7 +672,7 @@ public void testCommitAfterLeaveGroup() { // now switch to manual assignment client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct()); subscriptions.unsubscribe(); - coordinator.maybeLeaveGroup(false); + coordinator.maybeLeaveGroup(); subscriptions.assignFromUser(Arrays.asList(tp)); // the client should not reuse generation/memberId from auto-subscribed generation