diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4123914c6c9e2..01628b733060c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.HeartbeatRequestData; @@ -887,8 +888,8 @@ public void onFailure(RuntimeException e, RequestFuture future) { if (e instanceof NoBatchedFindCoordinatorsException) { batchFindCoordinator = false; - clearFindCoordinatorFuture(); - lookupCoordinator(); + // make a fast failure to send next request without batching + super.onFailure(new TimeoutException(e), future); return; } if (!(e instanceof RetriableException)) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index b97fb789faf27..00d77ed8da243 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; @@ -35,6 +36,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; @@ -55,6 +57,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.nio.ByteBuffer; import java.util.Arrays; @@ -1028,6 +1031,22 @@ public void testLookupCoordinator() { assertNotSame(future, coordinator.lookupCoordinator(), "New request not sent after previous completed"); } + @Timeout(10) + @Test + public void pollShouldBeDoneByNoBatchedFindCoordinatorsException() throws InterruptedException { + setupCoordinator(); + mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.FIND_COORDINATOR.id, (short) 3, (short) 3)); + RequestFuture future = coordinator.lookupCoordinator(); + assertFalse(future.isDone()); + // first response is replaced by UnsupportedVersionException + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + // second response is what mock client pass to network client + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(mockTime.timer(10)); + assertTrue(future.isDone()); + assertTrue(future.failed()); + } + @Test public void testWakeupAfterJoinGroupSent() throws Exception { setupCoordinator();