diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java index f2fa5942d8ca7..425498308e2d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java @@ -28,6 +28,8 @@ public class TaskMigratedException extends StreamsException { private final static long serialVersionUID = 1L; + private final Task task; + public TaskMigratedException(final Task task) { this(task, null); } @@ -36,17 +38,24 @@ public TaskMigratedException(final Task task, final TopicPartition topicPartition, final long endOffset, final long pos) { - super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d%n%s", - topicPartition, - endOffset, - pos, - task.toString("> ")), - null); + super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d", + topicPartition, + endOffset, + pos), + null); + + this.task = task; } public TaskMigratedException(final Task task, final Throwable throwable) { super(task.toString(), throwable); + + this.task = task; + } + + public Task migratedTask() { + return task; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index fb9b8e058a08c..2ba66a5b1bd98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -749,9 +749,14 @@ private void runLoop() { try { recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); } catch (final TaskMigratedException ignoreAndRejoinGroup) { - log.warn("Detected a task that got migrated to another thread. " + - "This implies that this thread missed a rebalance and dropped out of the consumer group. " + - "Trying to rejoin the consumer group now.", ignoreAndRejoinGroup); + log.warn("Detected task {} that got migrated to another thread: {} " + + "This implies that the thread may have missed a rebalance and dropped out of the consumer group. " + + "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", + ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.getMessage(), ignoreAndRejoinGroup.migratedTask().toString(">")); + + // re-subscribe to enforce a rebalance in the next poll call + consumer.unsubscribe(); + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); } } }