From aea92e0e8e5f514d193ccf38d93006509543553f Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 30 Mar 2018 16:10:37 -0700 Subject: [PATCH 1/2] enforce rebalance --- .../kafka/streams/processor/internals/StreamThread.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index fb9b8e058a08c..8c6752cb5cf43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -749,6 +749,15 @@ private void runLoop() { try { recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); } catch (final TaskMigratedException ignoreAndRejoinGroup) { + log.warn("Detected task {} that got migrated to another thread. " + + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + + "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", + ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">")); + + // re-subscribe to enforce a rebalance in the next poll call + consumer.unsubscribe(); + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + log.warn("Detected a task that got migrated to another thread. " + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + "Trying to rejoin the consumer group now.", ignoreAndRejoinGroup); From 568fd37be2eec2a8479c5b744c97ff216f48fd26 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 30 Mar 2018 16:18:14 -0700 Subject: [PATCH 2/2] improve on error message --- .../streams/errors/TaskMigratedException.java | 21 +++++++++++++------ .../processor/internals/StreamThread.java | 10 +++------ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java index f2fa5942d8ca7..425498308e2d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java @@ -28,6 +28,8 @@ public class TaskMigratedException extends StreamsException { private final static long serialVersionUID = 1L; + private final Task task; + public TaskMigratedException(final Task task) { this(task, null); } @@ -36,17 +38,24 @@ public TaskMigratedException(final Task task, final TopicPartition topicPartition, final long endOffset, final long pos) { - super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d%n%s", - topicPartition, - endOffset, - pos, - task.toString("> ")), - null); + super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d", + topicPartition, + endOffset, + pos), + null); + + this.task = task; } public TaskMigratedException(final Task task, final Throwable throwable) { super(task.toString(), throwable); + + this.task = task; + } + + public Task migratedTask() { + return task; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 8c6752cb5cf43..2ba66a5b1bd98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -749,18 +749,14 @@ private void runLoop() { try { recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); } catch (final TaskMigratedException ignoreAndRejoinGroup) { - log.warn("Detected task {} that got migrated to another thread. " + - "This implies that this thread missed a rebalance and dropped out of the consumer group. " + + log.warn("Detected task {} that got migrated to another thread: {} " + + "This implies that the thread may have missed a rebalance and dropped out of the consumer group. " + "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", - ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">")); + ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.getMessage(), ignoreAndRejoinGroup.migratedTask().toString(">")); // re-subscribe to enforce a rebalance in the next poll call consumer.unsubscribe(); consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); - - log.warn("Detected a task that got migrated to another thread. " + - "This implies that this thread missed a rebalance and dropped out of the consumer group. " + - "Trying to rejoin the consumer group now.", ignoreAndRejoinGroup); } } }