diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 350a84bf02268..1c2d607713498 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -888,9 +888,9 @@ public void run() { long now = time.milliseconds(); if (coordinatorUnknown()) { - if (findCoordinatorFuture == null) - lookupCoordinator(); - else + if (findCoordinatorFuture != null || lookupCoordinator().failed()) + // the immediate future check ensures that we backoff properly in the case that no + // brokers are available to connect to. AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should @@ -941,7 +941,7 @@ public void onFailure(RuntimeException e) { log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); this.failed.set(new RuntimeException(e)); } catch (RuntimeException e) { - log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); + log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e); this.failed.set(e); } finally { log.debug("Heartbeat thread for group {} has closed", groupId);