Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class TaskMigratedException extends StreamsException {

private final static long serialVersionUID = 1L;

private final Task task;

public TaskMigratedException(final Task task) {
this(task, null);
}
Expand All @@ -36,17 +38,24 @@ public TaskMigratedException(final Task task,
final TopicPartition topicPartition,
final long endOffset,
final long pos) {
super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d%n%s",
topicPartition,
endOffset,
pos,
task.toString("> ")),
null);
super(String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d",
topicPartition,
endOffset,
pos),
null);

this.task = task;
}

public TaskMigratedException(final Task task,
final Throwable throwable) {
super(task.toString(), throwable);

this.task = task;
}

public Task migratedTask() {
return task;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -749,9 +749,14 @@ private void runLoop() {
try {
recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
} catch (final TaskMigratedException ignoreAndRejoinGroup) {
log.warn("Detected a task that got migrated to another thread. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Trying to rejoin the consumer group now.", ignoreAndRejoinGroup);
log.warn("Detected task {} that got migrated to another thread: {} " +
"This implies that the thread may have missed a rebalance and dropped out of the consumer group. " +
"Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}",
ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.getMessage(), ignoreAndRejoinGroup.migratedTask().toString(">"));

// re-subscribe to enforce a rebalance in the next poll call
consumer.unsubscribe();
consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
}
}
}
Expand Down