diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 0d6a42ba98e44..ca7b9f10bfe0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -193,6 +193,10 @@ public State state() { return state; } + void setPartitionAssignedTime(final long lastPartitionAssignedMs) { + this.lastPartitionAssignedMs = lastPartitionAssignedMs; + } + /** * Sets the state * @@ -273,6 +277,7 @@ public boolean isRunning() { private long now; private long lastPollMs; private long lastCommitMs; + private long lastPartitionAssignedMs = -1L; private int numIterations; private volatile State state = State.CREATED; private volatile ThreadMetadata threadMetadata; @@ -777,7 +782,8 @@ private void initializeAndRestorePhase() { if (taskManager.tryToCompleteRestoration(now)) { changelogReader.transitToUpdateStandby(); - + log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, + taskManager.tasks().keySet()); setState(State.RUNNING); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java index eb0fe013248db..ab8a5ff86e976 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java @@ -73,6 +73,7 @@ public void onPartitionsAssigned(final Collection partitions) { } streamThread.setState(State.PARTITIONS_ASSIGNED); + streamThread.setPartitionAssignedTime(time.milliseconds()); taskManager.handleRebalanceComplete(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 593717b1b97f0..1bf06ba67c92c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -505,7 +505,7 @@ boolean tryToCompleteRestoration(final long now) { task.maybeInitTaskTimeoutOrThrow(now, timeoutException); log.debug( String.format( - "Could not complete restoration for %s due to the follosing exception; will retry", + "Could not complete restoration for %s due to the following exception; will retry", task.id()), timeoutException ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java index 3bdf83757b999..31177dd848a80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java @@ -45,8 +45,9 @@ public class StreamsRebalanceListenerTest { private final TaskManager taskManager = mock(TaskManager.class); private final StreamThread streamThread = mock(StreamThread.class); private final AtomicInteger assignmentErrorCode = new AtomicInteger(); + private final MockTime time = new MockTime(); private final StreamsRebalanceListener streamsRebalanceListener = new StreamsRebalanceListener( - new MockTime(), + time, taskManager, streamThread, LoggerFactory.getLogger(StreamsRebalanceListenerTest.class), @@ -78,6 +79,7 @@ public void shouldThrowMissingSourceTopicException() { @Test public void shouldSwallowVersionProbingError() { expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(State.PARTITIONS_REVOKED); + streamThread.setPartitionAssignedTime(time.milliseconds()); taskManager.handleRebalanceComplete(); replay(taskManager, streamThread); assignmentErrorCode.set(AssignorError.VERSION_PROBING.code()); @@ -129,6 +131,8 @@ public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() { public void shouldHandleAssignedPartitions() { taskManager.handleRebalanceComplete(); expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING); + streamThread.setPartitionAssignedTime(time.milliseconds()); + replay(taskManager, streamThread); assignmentErrorCode.set(AssignorError.NONE.code());