From 7500062262e9dceddfa9bc6413fe815b50c73c11 Mon Sep 17 00:00:00 2001 From: Allen Xiang Date: Fri, 17 Feb 2017 14:04:02 -0600 Subject: [PATCH 1/6] KAFKA-4777 fix client heartbeat non-stop retry issue. --- .../clients/consumer/internals/AbstractCoordinator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 350a84bf02268..853c1fb6285e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -888,8 +888,12 @@ public void run() { long now = time.milliseconds(); if (coordinatorUnknown()) { - if (findCoordinatorFuture == null) + if (findCoordinatorFuture == null) { lookupCoordinator(); + // coordinator may still be unknown, instead of an immediate retry, let's wait for a bit. + if (coordinatorUnknown()) + AbstractCoordinator.this.wait(retryBackoffMs); + } else AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { From 5b7de7649f3b8cb5cfd1ad7117d82d1a17d038c7 Mon Sep 17 00:00:00 2001 From: Allen Xiang Date: Fri, 17 Feb 2017 14:30:45 -0600 Subject: [PATCH 2/6] check style fix. --- .../clients/consumer/internals/AbstractCoordinator.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 853c1fb6285e2..fa513e233488a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -893,8 +893,7 @@ public void run() { // coordinator may still be unknown, instead of an immediate retry, let's wait for a bit. if (coordinatorUnknown()) AbstractCoordinator.this.wait(retryBackoffMs); - } - else + } else AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should @@ -945,7 +944,7 @@ public void onFailure(RuntimeException e) { log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); this.failed.set(new RuntimeException(e)); } catch (RuntimeException e) { - log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); + log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e); this.failed.set(e); } finally { log.debug("Heartbeat thread for group {} has closed", groupId); From 7d598de60cef7f11d5050f286da2580a744cf6fb Mon Sep 17 00:00:00 2001 From: Allen Xiang Date: Fri, 17 Feb 2017 18:07:17 -0600 Subject: [PATCH 3/6] KAFKA-4777 fix client heartbeat non-stop retry issue. Code clean up. --- .../clients/consumer/internals/AbstractCoordinator.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index fa513e233488a..527cdd3d602dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -888,13 +888,10 @@ public void run() { long now = time.milliseconds(); if (coordinatorUnknown()) { - if (findCoordinatorFuture == null) { + if (findCoordinatorFuture == null) lookupCoordinator(); - // coordinator may still be unknown, instead of an immediate retry, let's wait for a bit. - if (coordinatorUnknown()) - AbstractCoordinator.this.wait(retryBackoffMs); - } else - AbstractCoordinator.this.wait(retryBackoffMs); + // coordinator may still be unknown, instead of an immediate retry, let's wait for a bit. + AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should // probably make sure the coordinator is still healthy. From 1b0ee61a1ade8945ff833c66c7b91e6cf37d6cf7 Mon Sep 17 00:00:00 2001 From: Allen Xiang Date: Fri, 17 Feb 2017 19:41:14 -0600 Subject: [PATCH 4/6] KAFKA-4777 fix client heartbeat non-stop retry issue. Address code review comment. --- .../clients/consumer/internals/AbstractCoordinator.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 527cdd3d602dd..18957b14f0512 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -889,9 +889,10 @@ public void run() { if (coordinatorUnknown()) { if (findCoordinatorFuture == null) - lookupCoordinator(); - // coordinator may still be unknown, instead of an immediate retry, let's wait for a bit. - AbstractCoordinator.this.wait(retryBackoffMs); + if (lookupCoordinator().failed()) + AbstractCoordinator.this.wait(retryBackoffMs); + else + AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should // probably make sure the coordinator is still healthy. From 48f77e45b0cdf3f3175247749b3f6954e483310e Mon Sep 17 00:00:00 2001 From: Allen Xiang Date: Fri, 17 Feb 2017 19:45:40 -0600 Subject: [PATCH 5/6] KAFKA-4777 fix client heartbeat non-stop retry issue. Fix brackets. --- .../kafka/clients/consumer/internals/AbstractCoordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 18957b14f0512..7735e22af5202 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -888,10 +888,10 @@ public void run() { long now = time.milliseconds(); if (coordinatorUnknown()) { - if (findCoordinatorFuture == null) + if (findCoordinatorFuture == null) { if (lookupCoordinator().failed()) AbstractCoordinator.this.wait(retryBackoffMs); - else + } else AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should From 2999c16ded3b582ae6717c00e8b61412a897d9b7 Mon Sep 17 00:00:00 2001 From: Allen Xiang Date: Sat, 18 Feb 2017 08:45:15 -0600 Subject: [PATCH 6/6] KAFKA-4777 fix client heartbeat non-stop retry issue. Address code review comment. --- .../clients/consumer/internals/AbstractCoordinator.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 7735e22af5202..1c2d607713498 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -888,10 +888,9 @@ public void run() { long now = time.milliseconds(); if (coordinatorUnknown()) { - if (findCoordinatorFuture == null) { - if (lookupCoordinator().failed()) - AbstractCoordinator.this.wait(retryBackoffMs); - } else + if (findCoordinatorFuture != null || lookupCoordinator().failed()) + // the immediate future check ensures that we backoff properly in the case that no + // brokers are available to connect to. AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should